You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2020/01/07 00:19:23 UTC

[spark] branch master updated: [SPARK-30154][ML] PySpark UDF to convert MLlib vectors to dense arrays

This is an automated email from the ASF dual-hosted git repository.

meng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 88542bc  [SPARK-30154][ML] PySpark UDF to convert MLlib vectors to dense arrays
88542bc is described below

commit 88542bc3d9e506b1a0e852f3e9c632920d3fe553
Author: WeichenXu <we...@databricks.com>
AuthorDate: Mon Jan 6 16:18:51 2020 -0800

    [SPARK-30154][ML] PySpark UDF to convert MLlib vectors to dense arrays
    
    ### What changes were proposed in this pull request?
    
    PySpark UDF to convert MLlib vectors to dense arrays.
    Example:
    ```
    from pyspark.ml.functions import vector_to_array
    df.select(vector_to_array(col("features"))
    ```
    
    ### Why are the changes needed?
    If a PySpark user wants to convert MLlib sparse/dense vectors in a DataFrame into dense arrays, an efficient approach is to do that in JVM. However, it requires PySpark user to write Scala code and register it as a UDF. Often this is infeasible for a pure python project.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    UT.
    
    Closes #26910 from WeichenXu123/vector_to_array.
    
    Authored-by: WeichenXu <we...@databricks.com>
    Signed-off-by: Xiangrui Meng <me...@databricks.com>
---
 dev/sparktestsupport/modules.py                    |  1 +
 .../main/scala/org/apache/spark/ml/functions.scala | 48 +++++++++++++++
 .../scala/org/apache/spark/ml/FunctionsSuite.scala | 65 +++++++++++++++++++++
 python/docs/pyspark.ml.rst                         |  8 +++
 python/pyspark/ml/functions.py                     | 68 ++++++++++++++++++++++
 5 files changed, 190 insertions(+)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 1443584..4179359 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -460,6 +460,7 @@ pyspark_ml = Module(
         "pyspark.ml.evaluation",
         "pyspark.ml.feature",
         "pyspark.ml.fpm",
+        "pyspark.ml.functions",
         "pyspark.ml.image",
         "pyspark.ml.linalg.__init__",
         "pyspark.ml.recommendation",
diff --git a/mllib/src/main/scala/org/apache/spark/ml/functions.scala b/mllib/src/main/scala/org/apache/spark/ml/functions.scala
new file mode 100644
index 0000000..1faf562
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/functions.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml
+
+import org.apache.spark.annotation.Since
+import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.mllib.linalg.{Vector => OldVector}
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.functions.udf
+
+// scalastyle:off
+@Since("3.0.0")
+object functions {
+// scalastyle:on
+
+  private val vectorToArrayUdf = udf { vec: Any =>
+    vec match {
+      case v: Vector => v.toArray
+      case v: OldVector => v.toArray
+      case v => throw new IllegalArgumentException(
+        "function vector_to_array requires a non-null input argument and input type must be " +
+        "`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`, " +
+        s"but got ${ if (v == null) "null" else v.getClass.getName }.")
+    }
+  }.asNonNullable()
+
+  /**
+   * Converts a column of MLlib sparse/dense vectors into a column of dense arrays.
+   *
+   * @since 3.0.0
+   */
+  def vector_to_array(v: Column): Column = vectorToArrayUdf(v)
+}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala
new file mode 100644
index 0000000..2f5062c
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml
+
+import org.apache.spark.SparkException
+import org.apache.spark.ml.functions.vector_to_array
+import org.apache.spark.ml.linalg.Vectors
+import org.apache.spark.ml.util.MLTest
+import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
+import org.apache.spark.sql.functions.col
+
+class FunctionsSuite extends MLTest {
+
+  import testImplicits._
+
+  test("test vector_to_array") {
+    val df = Seq(
+      (Vectors.dense(1.0, 2.0, 3.0), OldVectors.dense(10.0, 20.0, 30.0)),
+      (Vectors.sparse(3, Seq((0, 2.0), (2, 3.0))), OldVectors.sparse(3, Seq((0, 20.0), (2, 30.0))))
+    ).toDF("vec", "oldVec")
+
+      val result = df.select(vector_to_array('vec), vector_to_array('oldVec))
+      .as[(Seq[Double], Seq[Double])]
+      .collect().toSeq
+
+    val expected = Seq(
+      (Seq(1.0, 2.0, 3.0), Seq(10.0, 20.0, 30.0)),
+      (Seq(2.0, 0.0, 3.0), Seq(20.0, 0.0, 30.0))
+    )
+    assert(result === expected)
+
+    val df2 = Seq(
+      (Vectors.dense(1.0, 2.0, 3.0),
+       OldVectors.dense(10.0, 20.0, 30.0), 1),
+      (null, null, 0)
+    ).toDF("vec", "oldVec", "label")
+
+
+    for ((colName, valType) <- Seq(
+        ("vec", "null"), ("oldVec", "null"), ("label", "java.lang.Integer"))) {
+      val thrown1 = intercept[SparkException] {
+        df2.select(vector_to_array(col(colName))).count
+      }
+      assert(thrown1.getCause.getMessage.contains(
+        "function vector_to_array requires a non-null input argument and input type must be " +
+        "`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`, " +
+        s"but got ${valType}"))
+    }
+  }
+}
diff --git a/python/docs/pyspark.ml.rst b/python/docs/pyspark.ml.rst
index 6a5d817..e31dfdd 100644
--- a/python/docs/pyspark.ml.rst
+++ b/python/docs/pyspark.ml.rst
@@ -41,6 +41,14 @@ pyspark.ml.clustering module
     :undoc-members:
     :inherited-members:
 
+pyspark.ml.functions module
+----------------------------
+
+.. automodule:: pyspark.ml.functions
+    :members:
+    :undoc-members:
+    :inherited-members:
+
 pyspark.ml.linalg module
 ----------------------------
 
diff --git a/python/pyspark/ml/functions.py b/python/pyspark/ml/functions.py
new file mode 100644
index 0000000..2b4d8dd
--- /dev/null
+++ b/python/pyspark/ml/functions.py
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark import since, SparkContext
+from pyspark.sql.column import Column, _to_java_column
+
+
+@since(3.0)
+def vector_to_array(col):
+    """
+    Converts a column of MLlib sparse/dense vectors into a column of dense arrays.
+
+    >>> from pyspark.ml.linalg import Vectors
+    >>> from pyspark.ml.functions import vector_to_array
+    >>> from pyspark.mllib.linalg import Vectors as OldVectors
+    >>> df = spark.createDataFrame([
+    ...     (Vectors.dense(1.0, 2.0, 3.0), OldVectors.dense(10.0, 20.0, 30.0)),
+    ...     (Vectors.sparse(3, [(0, 2.0), (2, 3.0)]),
+    ...      OldVectors.sparse(3, [(0, 20.0), (2, 30.0)]))],
+    ...     ["vec", "oldVec"])
+    >>> df.select(vector_to_array("vec").alias("vec"),
+    ...           vector_to_array("oldVec").alias("oldVec")).collect()
+    [Row(vec=[1.0, 2.0, 3.0], oldVec=[10.0, 20.0, 30.0]),
+     Row(vec=[2.0, 0.0, 3.0], oldVec=[20.0, 0.0, 30.0])]
+    """
+    sc = SparkContext._active_spark_context
+    return Column(
+        sc._jvm.org.apache.spark.ml.functions.vector_to_array(_to_java_column(col)))
+
+
+def _test():
+    import doctest
+    from pyspark.sql import SparkSession
+    import pyspark.ml.functions
+    import sys
+    globs = pyspark.ml.functions.__dict__.copy()
+    spark = SparkSession.builder \
+        .master("local[2]") \
+        .appName("ml.functions tests") \
+        .getOrCreate()
+    sc = spark.sparkContext
+    globs['sc'] = sc
+    globs['spark'] = spark
+
+    (failure_count, test_count) = doctest.testmod(
+        pyspark.ml.functions, globs=globs,
+        optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
+    spark.stop()
+    if failure_count:
+        sys.exit(-1)
+
+
+if __name__ == "__main__":
+    _test()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org