You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/13 15:59:14 UTC

[spark] branch branch-3.0 updated: [SPARK-30762] Add dtype=float32 support to vector_to_array UDF

This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 074712e  [SPARK-30762] Add dtype=float32 support to vector_to_array UDF
074712e is described below

commit 074712e329b347f769f8c009949c7845e95b3212
Author: Liang Zhang <li...@databricks.com>
AuthorDate: Thu Feb 13 23:55:13 2020 +0800

    [SPARK-30762] Add dtype=float32 support to vector_to_array UDF
    
    ### What changes were proposed in this pull request?
    In this PR, we add a parameter in the python function vector_to_array(col) that allows converting to a column of arrays of Float (32bits) in scala, which would be mapped to a numpy array of dtype=float32.
    
    ### Why are the changes needed?
    In the downstream ML training, using float32 instead of float64 (default) would allow a larger batch size, i.e., allow more data to fit in the memory.
    
    ### Does this PR introduce any user-facing change?
    Yes.
    Old: `vector_to_array()` only take one param
    ```
    df.select(vector_to_array("colA"), ...)
    ```
    New: `vector_to_array()` can take an additional optional param: `dtype` = "float32" (or "float64")
    ```
    df.select(vector_to_array("colA", "float32"), ...)
    ```
    
    ### How was this patch tested?
    Unit test in scala.
    doctest in python.
    
    Closes #27522 from liangz1/udf-float32.
    
    Authored-by: Liang Zhang <li...@databricks.com>
    Signed-off-by: WeichenXu <we...@databricks.com>
    (cherry picked from commit 82d0aa37ae521231d8067e473c6ea79a118a115a)
    Signed-off-by: WeichenXu <we...@databricks.com>
---
 .../main/scala/org/apache/spark/ml/functions.scala | 34 +++++++++++++++++++---
 .../scala/org/apache/spark/ml/FunctionsSuite.scala | 33 ++++++++++++++++++---
 python/pyspark/ml/functions.py                     | 27 +++++++++++++----
 3 files changed, 81 insertions(+), 13 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/functions.scala b/mllib/src/main/scala/org/apache/spark/ml/functions.scala
index 1faf562..0f03231 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/functions.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/functions.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.ml
 
 import org.apache.spark.annotation.Since
-import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.linalg.{SparseVector, Vector}
 import org.apache.spark.mllib.linalg.{Vector => OldVector}
 import org.apache.spark.sql.Column
 import org.apache.spark.sql.functions.udf
@@ -27,7 +27,6 @@ import org.apache.spark.sql.functions.udf
 @Since("3.0.0")
 object functions {
 // scalastyle:on
-
   private val vectorToArrayUdf = udf { vec: Any =>
     vec match {
       case v: Vector => v.toArray
@@ -39,10 +38,37 @@ object functions {
     }
   }.asNonNullable()
 
+  private val vectorToArrayFloatUdf = udf { vec: Any =>
+    vec match {
+      case v: SparseVector =>
+        val data = new Array[Float](v.size)
+        v.foreachActive { (index, value) => data(index) = value.toFloat }
+        data
+      case v: Vector => v.toArray.map(_.toFloat)
+      case v: OldVector => v.toArray.map(_.toFloat)
+      case v => throw new IllegalArgumentException(
+        "function vector_to_array requires a non-null input argument and input type must be " +
+        "`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`, " +
+        s"but got ${ if (v == null) "null" else v.getClass.getName }.")
+    }
+  }.asNonNullable()
+
   /**
    * Converts a column of MLlib sparse/dense vectors into a column of dense arrays.
-   *
+   * @param v: the column of MLlib sparse/dense vectors
+   * @param dtype: the desired underlying data type in the returned array
+   * @return an array&lt;float&gt; if dtype is float32, or array&lt;double&gt; if dtype is float64
    * @since 3.0.0
    */
-  def vector_to_array(v: Column): Column = vectorToArrayUdf(v)
+  def vector_to_array(v: Column, dtype: String = "float64"): Column = {
+    if (dtype == "float64") {
+      vectorToArrayUdf(v)
+    } else if (dtype == "float32") {
+      vectorToArrayFloatUdf(v)
+    } else {
+      throw new IllegalArgumentException(
+        s"Unsupported dtype: $dtype. Valid values: float64, float32."
+      )
+    }
+  }
 }
diff --git a/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala
index 2f5062c..3dd9a7d 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala
@@ -34,9 +34,8 @@ class FunctionsSuite extends MLTest {
       (Vectors.sparse(3, Seq((0, 2.0), (2, 3.0))), OldVectors.sparse(3, Seq((0, 20.0), (2, 30.0))))
     ).toDF("vec", "oldVec")
 
-      val result = df.select(vector_to_array('vec), vector_to_array('oldVec))
-      .as[(Seq[Double], Seq[Double])]
-      .collect().toSeq
+    val result = df.select(vector_to_array('vec), vector_to_array('oldVec))
+                   .as[(Seq[Double], Seq[Double])].collect().toSeq
 
     val expected = Seq(
       (Seq(1.0, 2.0, 3.0), Seq(10.0, 20.0, 30.0)),
@@ -50,7 +49,6 @@ class FunctionsSuite extends MLTest {
       (null, null, 0)
     ).toDF("vec", "oldVec", "label")
 
-
     for ((colName, valType) <- Seq(
         ("vec", "null"), ("oldVec", "null"), ("label", "java.lang.Integer"))) {
       val thrown1 = intercept[SparkException] {
@@ -61,5 +59,32 @@ class FunctionsSuite extends MLTest {
         "`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`, " +
         s"but got ${valType}"))
     }
+
+    val df3 = Seq(
+      (Vectors.dense(1.0, 2.0, 3.0), OldVectors.dense(10.0, 20.0, 30.0)),
+      (Vectors.sparse(3, Seq((0, 2.0), (2, 3.0))), OldVectors.sparse(3, Seq((0, 20.0), (2, 30.0))))
+    ).toDF("vec", "oldVec")
+    val dfArrayFloat = df3.select(
+      vector_to_array('vec, dtype = "float32"), vector_to_array('oldVec, dtype = "float32"))
+
+    // Check values are correct
+    val result3 = dfArrayFloat.as[(Seq[Float], Seq[Float])].collect().toSeq
+
+    val expected3 = Seq(
+      (Seq(1.0, 2.0, 3.0), Seq(10.0, 20.0, 30.0)),
+      (Seq(2.0, 0.0, 3.0), Seq(20.0, 0.0, 30.0))
+    )
+    assert(result3 === expected3)
+
+    // Check data types are correct
+    assert(dfArrayFloat.schema.simpleString ===
+      "struct<UDF(vec):array<float>,UDF(oldVec):array<float>>")
+
+    val thrown2 = intercept[IllegalArgumentException] {
+      df3.select(
+        vector_to_array('vec, dtype = "float16"), vector_to_array('oldVec, dtype = "float16"))
+    }
+    assert(thrown2.getMessage.contains(
+      s"Unsupported dtype: float16. Valid values: float64, float32."))
   }
 }
diff --git a/python/pyspark/ml/functions.py b/python/pyspark/ml/functions.py
index 2b4d8dd..ec164f3 100644
--- a/python/pyspark/ml/functions.py
+++ b/python/pyspark/ml/functions.py
@@ -19,10 +19,15 @@ from pyspark import since, SparkContext
 from pyspark.sql.column import Column, _to_java_column
 
 
-@since(3.0)
-def vector_to_array(col):
+@since("3.0.0")
+def vector_to_array(col, dtype="float64"):
     """
     Converts a column of MLlib sparse/dense vectors into a column of dense arrays.
+    :param col: A string of the column name or a Column
+    :param dtype: The data type of the output array. Valid values: "float64" or "float32".
+    :return: The converted column of dense arrays.
+
+    .. versionadded:: 3.0.0
 
     >>> from pyspark.ml.linalg import Vectors
     >>> from pyspark.ml.functions import vector_to_array
@@ -32,14 +37,26 @@ def vector_to_array(col):
     ...     (Vectors.sparse(3, [(0, 2.0), (2, 3.0)]),
     ...      OldVectors.sparse(3, [(0, 20.0), (2, 30.0)]))],
     ...     ["vec", "oldVec"])
-    >>> df.select(vector_to_array("vec").alias("vec"),
-    ...           vector_to_array("oldVec").alias("oldVec")).collect()
+    >>> df1 = df.select(vector_to_array("vec").alias("vec"),
+    ...                 vector_to_array("oldVec").alias("oldVec"))
+    >>> df1.collect()
+    [Row(vec=[1.0, 2.0, 3.0], oldVec=[10.0, 20.0, 30.0]),
+     Row(vec=[2.0, 0.0, 3.0], oldVec=[20.0, 0.0, 30.0])]
+    >>> df2 = df.select(vector_to_array("vec", "float32").alias("vec"),
+    ...                 vector_to_array("oldVec", "float32").alias("oldVec"))
+    >>> df2.collect()
     [Row(vec=[1.0, 2.0, 3.0], oldVec=[10.0, 20.0, 30.0]),
      Row(vec=[2.0, 0.0, 3.0], oldVec=[20.0, 0.0, 30.0])]
+    >>> df1.schema.fields
+    [StructField(vec,ArrayType(DoubleType,false),false),
+    StructField(oldVec,ArrayType(DoubleType,false),false)]
+    >>> df2.schema.fields
+    [StructField(vec,ArrayType(FloatType,false),false),
+    StructField(oldVec,ArrayType(FloatType,false),false)]
     """
     sc = SparkContext._active_spark_context
     return Column(
-        sc._jvm.org.apache.spark.ml.functions.vector_to_array(_to_java_column(col)))
+        sc._jvm.org.apache.spark.ml.functions.vector_to_array(_to_java_column(col), dtype))
 
 
 def _test():


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org