You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2017/03/26 23:49:31 UTC
spark git commit: [SPARK-19281][PYTHON][ML] spark.ml Python API for
FPGrowth
Repository: spark
Updated Branches:
refs/heads/master 617ab6445 -> 0bc8847aa
[SPARK-19281][PYTHON][ML] spark.ml Python API for FPGrowth
## What changes were proposed in this pull request?
- Add `HasSupport` and `HasConfidence` `Params`.
- Add new module `pyspark.ml.fpm`.
- Add `FPGrowth` / `FPGrowthModel` wrappers.
- Provide tests for new features.
## How was this patch tested?
Unit tests.
Author: zero323 <ze...@users.noreply.github.com>
Closes #17218 from zero323/SPARK-19281.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0bc8847a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0bc8847a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0bc8847a
Branch: refs/heads/master
Commit: 0bc8847aa216497549c78ad49ec7ac066a059b15
Parents: 617ab64
Author: zero323 <ze...@users.noreply.github.com>
Authored: Sun Mar 26 16:49:27 2017 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Sun Mar 26 16:49:27 2017 -0700
----------------------------------------------------------------------
dev/sparktestsupport/modules.py | 5 +-
python/docs/pyspark.ml.rst | 8 ++
python/pyspark/ml/fpm.py | 216 +++++++++++++++++++++++++++++++++++
python/pyspark/ml/tests.py | 53 +++++++--
4 files changed, 273 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0bc8847a/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 10ad1fe..eaf1f3a 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -423,15 +423,16 @@ pyspark_ml = Module(
"python/pyspark/ml/"
],
python_test_goals=[
- "pyspark.ml.feature",
"pyspark.ml.classification",
"pyspark.ml.clustering",
+ "pyspark.ml.evaluation",
+ "pyspark.ml.feature",
+ "pyspark.ml.fpm",
"pyspark.ml.linalg.__init__",
"pyspark.ml.recommendation",
"pyspark.ml.regression",
"pyspark.ml.tuning",
"pyspark.ml.tests",
- "pyspark.ml.evaluation",
],
blacklisted_python_implementations=[
"PyPy" # Skip these tests under PyPy since they require numpy and it isn't available there
http://git-wip-us.apache.org/repos/asf/spark/blob/0bc8847a/python/docs/pyspark.ml.rst
----------------------------------------------------------------------
diff --git a/python/docs/pyspark.ml.rst b/python/docs/pyspark.ml.rst
index 26f7415..a681834 100644
--- a/python/docs/pyspark.ml.rst
+++ b/python/docs/pyspark.ml.rst
@@ -80,3 +80,11 @@ pyspark.ml.evaluation module
:members:
:undoc-members:
:inherited-members:
+
+pyspark.ml.fpm module
+----------------------------
+
+.. automodule:: pyspark.ml.fpm
+ :members:
+ :undoc-members:
+ :inherited-members:
http://git-wip-us.apache.org/repos/asf/spark/blob/0bc8847a/python/pyspark/ml/fpm.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py
new file mode 100644
index 0000000..b30d4ed
--- /dev/null
+++ b/python/pyspark/ml/fpm.py
@@ -0,0 +1,216 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark import keyword_only, since
+from pyspark.ml.util import *
+from pyspark.ml.wrapper import JavaEstimator, JavaModel
+from pyspark.ml.param.shared import *
+
+__all__ = ["FPGrowth", "FPGrowthModel"]
+
+
+class HasSupport(Params):
+ """
+ Mixin for param support.
+ """
+
+ minSupport = Param(
+ Params._dummy(),
+ "minSupport",
+ """Minimal support level of the frequent pattern. [0.0, 1.0].
+ Any pattern that appears more than (minSupport * size-of-the-dataset)
+ times will be output""",
+ typeConverter=TypeConverters.toFloat)
+
+ def setMinSupport(self, value):
+ """
+ Sets the value of :py:attr:`minSupport`.
+ """
+ return self._set(minSupport=value)
+
+ def getMinSupport(self):
+ """
+ Gets the value of minSupport or its default value.
+ """
+ return self.getOrDefault(self.minSupport)
+
+
+class HasConfidence(Params):
+ """
+ Mixin for param confidence.
+ """
+
+ minConfidence = Param(
+ Params._dummy(),
+ "minConfidence",
+ """Minimal confidence for generating Association Rule. [0.0, 1.0]
+ Note that minConfidence has no effect during fitting.""",
+ typeConverter=TypeConverters.toFloat)
+
+ def setMinConfidence(self, value):
+ """
+ Sets the value of :py:attr:`minConfidence`.
+ """
+ return self._set(minConfidence=value)
+
+ def getMinConfidence(self):
+ """
+ Gets the value of minConfidence or its default value.
+ """
+ return self.getOrDefault(self.minConfidence)
+
+
+class HasItemsCol(Params):
+ """
+ Mixin for param itemsCol: items column name.
+ """
+
+ itemsCol = Param(Params._dummy(), "itemsCol",
+ "items column name", typeConverter=TypeConverters.toString)
+
+ def setItemsCol(self, value):
+ """
+ Sets the value of :py:attr:`itemsCol`.
+ """
+ return self._set(itemsCol=value)
+
+ def getItemsCol(self):
+ """
+ Gets the value of itemsCol or its default value.
+ """
+ return self.getOrDefault(self.itemsCol)
+
+
+class FPGrowthModel(JavaModel, JavaMLWritable, JavaMLReadable):
+ """
+ .. note:: Experimental
+
+ Model fitted by FPGrowth.
+
+ .. versionadded:: 2.2.0
+ """
+ @property
+ @since("2.2.0")
+ def freqItemsets(self):
+ """
+ DataFrame with two columns:
+ * `items` - Itemset of the same type as the input column.
+ * `freq` - Frequency of the itemset (`LongType`).
+ """
+ return self._call_java("freqItemsets")
+
+ @property
+ @since("2.2.0")
+ def associationRules(self):
+ """
+ Data with three columns:
+ * `antecedent` - Array of the same type as the input column.
+ * `consequent` - Array of the same type as the input column.
+ * `confidence` - Confidence for the rule (`DoubleType`).
+ """
+ return self._call_java("associationRules")
+
+
+class FPGrowth(JavaEstimator, HasItemsCol, HasPredictionCol,
+ HasSupport, HasConfidence, JavaMLWritable, JavaMLReadable):
+ """
+ .. note:: Experimental
+
+ A parallel FP-growth algorithm to mine frequent itemsets. The algorithm is described in
+ Li et al., PFP: Parallel FP-Growth for Query Recommendation [LI2008]_.
+ PFP distributes computation in such a way that each worker executes an
+ independent group of mining tasks. The FP-Growth algorithm is described in
+ Han et al., Mining frequent patterns without candidate generation [HAN2000]_
+
+ .. [LI2008] http://dx.doi.org/10.1145/1454008.1454027
+ .. [HAN2000] http://dx.doi.org/10.1145/335191.335372
+
+ .. note:: null values in the feature column are ignored during fit().
+ .. note:: Internally `transform` `collects` and `broadcasts` association rules.
+
+ >>> from pyspark.sql.functions import split
+ >>> data = (spark.read
+ ... .text("data/mllib/sample_fpgrowth.txt")
+ ... .select(split("value", "\s+").alias("items")))
+ >>> data.show(truncate=False)
+ +------------------------+
+ |items |
+ +------------------------+
+ |[r, z, h, k, p] |
+ |[z, y, x, w, v, u, t, s]|
+ |[s, x, o, n, r] |
+ |[x, z, y, m, t, s, q, e]|
+ |[z] |
+ |[x, z, y, r, q, t, p] |
+ +------------------------+
+ >>> fp = FPGrowth(minSupport=0.2, minConfidence=0.7)
+ >>> fpm = fp.fit(data)
+ >>> fpm.freqItemsets.show(5)
+ +---------+----+
+ | items|freq|
+ +---------+----+
+ | [s]| 3|
+ | [s, x]| 3|
+ |[s, x, z]| 2|
+ | [s, z]| 2|
+ | [r]| 3|
+ +---------+----+
+ only showing top 5 rows
+ >>> fpm.associationRules.show(5)
+ +----------+----------+----------+
+ |antecedent|consequent|confidence|
+ +----------+----------+----------+
+ | [t, s]| [y]| 1.0|
+ | [t, s]| [x]| 1.0|
+ | [t, s]| [z]| 1.0|
+ | [p]| [r]| 1.0|
+ | [p]| [z]| 1.0|
+ +----------+----------+----------+
+ only showing top 5 rows
+ >>> new_data = spark.createDataFrame([(["t", "s"], )], ["items"])
+ >>> sorted(fpm.transform(new_data).first().prediction)
+ ['x', 'y', 'z']
+
+ .. versionadded:: 2.2.0
+ """
+ @keyword_only
+ def __init__(self, minSupport=0.3, minConfidence=0.8, itemsCol="items",
+ predictionCol="prediction", numPartitions=None):
+ """
+ __init__(self, minSupport=0.3, minConfidence=0.8, itemsCol="items", \
+ predictionCol="prediction", numPartitions=None)
+ """
+ super(FPGrowth, self).__init__()
+ self._java_obj = self._new_java_obj("org.apache.spark.ml.fpm.FPGrowth", self.uid)
+ self._setDefault(minSupport=0.3, minConfidence=0.8,
+ itemsCol="items", predictionCol="prediction")
+ kwargs = self._input_kwargs
+ self.setParams(**kwargs)
+
+ @keyword_only
+ @since("2.2.0")
+ def setParams(self, minSupport=0.3, minConfidence=0.8, itemsCol="items",
+ predictionCol="prediction", numPartitions=None):
+ """
+ setParams(self, minSupport=0.3, minConfidence=0.8, itemsCol="items", \
+ predictionCol="prediction", numPartitions=None)
+ """
+ kwargs = self._input_kwargs
+ return self._set(**kwargs)
+
+ def _create_model(self, java_model):
+ return FPGrowthModel(java_model)
http://git-wip-us.apache.org/repos/asf/spark/blob/0bc8847a/python/pyspark/ml/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index cc559db..527db9b 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -42,7 +42,7 @@ import tempfile
import array as pyarray
import numpy as np
from numpy import (
- array, array_equal, zeros, inf, random, exp, dot, all, mean, abs, arange, tile, ones)
+ abs, all, arange, array, array_equal, dot, exp, inf, mean, ones, random, tile, zeros)
from numpy import sum as array_sum
import inspect
@@ -50,18 +50,20 @@ from pyspark import keyword_only, SparkContext
from pyspark.ml import Estimator, Model, Pipeline, PipelineModel, Transformer
from pyspark.ml.classification import *
from pyspark.ml.clustering import *
+from pyspark.ml.common import _java2py, _py2java
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.feature import *
-from pyspark.ml.linalg import Vector, SparseVector, DenseVector, VectorUDT,\
- DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT, _convert_to_vector
+from pyspark.ml.fpm import FPGrowth, FPGrowthModel
+from pyspark.ml.linalg import (
+ DenseMatrix, DenseMatrix, DenseVector, Matrices, MatrixUDT,
+ SparseMatrix, SparseVector, Vector, VectorUDT, Vectors, _convert_to_vector)
from pyspark.ml.param import Param, Params, TypeConverters
-from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed
+from pyspark.ml.param.shared import HasInputCol, HasMaxIter, HasSeed
from pyspark.ml.recommendation import ALS
-from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, \
- GeneralizedLinearRegression
+from pyspark.ml.regression import (
+ DecisionTreeRegressor, GeneralizedLinearRegression, LinearRegression)
from pyspark.ml.tuning import *
from pyspark.ml.wrapper import JavaParams, JavaWrapper
-from pyspark.ml.common import _java2py, _py2java
from pyspark.serializers import PickleSerializer
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.functions import rand
@@ -1243,6 +1245,43 @@ class GeneralizedLinearRegressionTest(SparkSessionTestCase):
self.assertTrue(np.isclose(model2.intercept, 0.6667, atol=1E-4))
+class FPGrowthTests(SparkSessionTestCase):
+ def setUp(self):
+ super(FPGrowthTests, self).setUp()
+ self.data = self.spark.createDataFrame(
+ [([1, 2], ), ([1, 2], ), ([1, 2, 3], ), ([1, 3], )],
+ ["items"])
+
+ def test_association_rules(self):
+ fp = FPGrowth()
+ fpm = fp.fit(self.data)
+
+ expected_association_rules = self.spark.createDataFrame(
+ [([3], [1], 1.0), ([2], [1], 1.0)],
+ ["antecedent", "consequent", "confidence"]
+ )
+ actual_association_rules = fpm.associationRules
+
+ self.assertEqual(actual_association_rules.subtract(expected_association_rules).count(), 0)
+ self.assertEqual(expected_association_rules.subtract(actual_association_rules).count(), 0)
+
+ def test_freq_itemsets(self):
+ fp = FPGrowth()
+ fpm = fp.fit(self.data)
+
+ expected_freq_itemsets = self.spark.createDataFrame(
+ [([1], 4), ([2], 3), ([2, 1], 3), ([3], 2), ([3, 1], 2)],
+ ["items", "freq"]
+ )
+ actual_freq_itemsets = fpm.freqItemsets
+
+ self.assertEqual(actual_freq_itemsets.subtract(expected_freq_itemsets).count(), 0)
+ self.assertEqual(expected_freq_itemsets.subtract(actual_freq_itemsets).count(), 0)
+
+ def tearDown(self):
+ del self.data
+
+
class ALSTest(SparkSessionTestCase):
def test_storage_levels(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org