You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jackylk <gi...@git.apache.org> on 2014/10/19 11:49:51 UTC

[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori algorithm f...

GitHub user jackylk opened a pull request:

    https://github.com/apache/spark/pull/2847

    [SPARK-4001][MLlib] adding apriori algorithm for frequent item set mining in Spark

    Apriori is the classic algorithm for frequent item set mining in a transactional data set. It will be useful if Apriori algorithm is added to MLLib in Spark. This PR add an implementation for it. 
    There is a point I am not sure wether it is most efficient. In order to filter out the eligible frequent item set, currently I am using a cartesian operation on two RDDs to calculate the degree of support of each item set, not sure wether it is better to use broadcast variable to achieve the same.
    
    I will add an example to use this algorithm if requires

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jackylk/spark apriori

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2847.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2847
    
----
commit da2cba7e063745aacef74ff555e7bd7c55a24f56
Author: Jacky Li <ja...@huawei.com>
Date:   2014-10-19T09:19:27Z

    adding apriori algorithm for frequent item set mining in Spark

commit 889b33fdfabcc222c82e3bce619aeb6c7031fc58
Author: Jacky Li <ja...@huawei.com>
Date:   2014-10-19T09:31:04Z

    modify per scalastyle check

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273421
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    --- End diff --
    
    Doc minSupport.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori and fp-grow...

Posted by denmoroz <gi...@git.apache.org>.
Github user denmoroz commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-66130537
  
    Dou you use SON algorithm for Apriori parallel implementation?
    (http://importantfish.com/limited-pass-algorithms/)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-70458259
  
    Please test again


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori and fp-grow...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-70041216
  
    add to whitelist


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori algorithm f...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-62083924
  
    Had an offline discussion with @jackylk . We plan to implement a more scalable version of Apriori, as described in PFP: Parallel FP-Growth for Query Recommendation (http://dl.acm.org/citation.cfm?id=1454027)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-72404067
  
    LGTM. Merged into master. Thanks!! (The failed test is a known flakey test. All relevant tests passed.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-72232842
  
      [Test build #26407 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26407/consoleFull) for   PR 2847 at commit [`ec21f7d`](https://github.com/apache/spark/commit/ec21f7dfcad6191e0c2d6d7fd93ac77012098e6c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273419
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    --- End diff --
    
    Let's use the permanent link: http://dx.doi.org/10.1145/1454008.1454027
    
    It is nice to cite the FP-Growth paper from Han: http://dx.doi.org/10.1145/335191.335372


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273443
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    +    val count = data.count()
    +    val minCount = minSupport * count
    +    val single = generateSingleItem(data, minCount)
    +    val combinations = generateCombinations(data, minCount, single)
    +    new FPGrowthModel(single ++ combinations)
    +  }
    +
    +  /**
    +   * Generate single item pattern by filtering the input data using minimal support level
    +   */
    +  private def generateSingleItem(
    +      data: RDD[Array[String]],
    +      minCount: Double): Array[(String, Int)] = {
    +    data.flatMap(v => v)
    +      .map(v => (v, 1))
    +      .reduceByKey(_ + _)
    +      .filter(_._2 >= minCount)
    +      .collect()
    +      .distinct
    +      .sortWith(_._2 > _._2)
    +  }
    +
    +  /**
    +   * Generate combination of items by computing on FPTree,
    +   * the computation is done on each FPTree partitions.
    --- End diff --
    
    Should document at least the return value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273532
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    +    val count = data.count()
    +    val minCount = minSupport * count
    +    val single = generateSingleItem(data, minCount)
    +    val combinations = generateCombinations(data, minCount, single)
    +    new FPGrowthModel(single ++ combinations)
    +  }
    +
    +  /**
    +   * Generate single item pattern by filtering the input data using minimal support level
    +   */
    +  private def generateSingleItem(
    +      data: RDD[Array[String]],
    +      minCount: Double): Array[(String, Int)] = {
    +    data.flatMap(v => v)
    +      .map(v => (v, 1))
    +      .reduceByKey(_ + _)
    +      .filter(_._2 >= minCount)
    +      .collect()
    +      .distinct
    +      .sortWith(_._2 > _._2)
    +  }
    +
    +  /**
    +   * Generate combination of items by computing on FPTree,
    +   * the computation is done on each FPTree partitions.
    --- End diff --
    
    For the return type, it would be nice to use `Array[(Array[String], Int)]`. For frequent sets with single elements, we can also use this type to represent them. It is not necessary to build strings.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-72403864
  
      [Test build #26486 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26486/consoleFull) for   PR 2847 at commit [`bee3093`](https://github.com/apache/spark/commit/bee3093daa4c8473a9f531c5fdee353c06cd1bf0).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FPGrowthModel(val freqItemsets: RDD[(Array[String], Long)]) extends Serializable`
      * `  class Node[T](val parent: Node[T]) extends Serializable `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273444
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    +    val count = data.count()
    +    val minCount = minSupport * count
    +    val single = generateSingleItem(data, minCount)
    +    val combinations = generateCombinations(data, minCount, single)
    +    new FPGrowthModel(single ++ combinations)
    +  }
    +
    +  /**
    +   * Generate single item pattern by filtering the input data using minimal support level
    +   */
    +  private def generateSingleItem(
    +      data: RDD[Array[String]],
    +      minCount: Double): Array[(String, Int)] = {
    +    data.flatMap(v => v)
    +      .map(v => (v, 1))
    +      .reduceByKey(_ + _)
    +      .filter(_._2 >= minCount)
    +      .collect()
    +      .distinct
    +      .sortWith(_._2 > _._2)
    +  }
    +
    +  /**
    +   * Generate combination of items by computing on FPTree,
    +   * the computation is done on each FPTree partitions.
    +   */
    +  private def generateCombinations(
    +      data: RDD[Array[String]],
    +      minCount: Double,
    +      singleItem: Array[(String, Int)]): Array[(String, Int)] = {
    +    val single = data.context.broadcast(singleItem)
    +    data.flatMap(basket => createFPTree(basket, single))
    +      .groupByKey()
    --- End diff --
    
    Should it be an `aggregateByKey`? `GroupByKey` collects the data to a single reducer, which usually causes scalability issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273436
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    +    val count = data.count()
    +    val minCount = minSupport * count
    +    val single = generateSingleItem(data, minCount)
    +    val combinations = generateCombinations(data, minCount, single)
    +    new FPGrowthModel(single ++ combinations)
    +  }
    +
    +  /**
    +   * Generate single item pattern by filtering the input data using minimal support level
    +   */
    +  private def generateSingleItem(
    +      data: RDD[Array[String]],
    +      minCount: Double): Array[(String, Int)] = {
    +    data.flatMap(v => v)
    +      .map(v => (v, 1))
    +      .reduceByKey(_ + _)
    +      .filter(_._2 >= minCount)
    +      .collect()
    +      .distinct
    --- End diff --
    
    Why do we need `distinct`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273447
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    +    val count = data.count()
    +    val minCount = minSupport * count
    +    val single = generateSingleItem(data, minCount)
    +    val combinations = generateCombinations(data, minCount, single)
    +    new FPGrowthModel(single ++ combinations)
    +  }
    +
    +  /**
    +   * Generate single item pattern by filtering the input data using minimal support level
    +   */
    +  private def generateSingleItem(
    +      data: RDD[Array[String]],
    +      minCount: Double): Array[(String, Int)] = {
    +    data.flatMap(v => v)
    +      .map(v => (v, 1))
    +      .reduceByKey(_ + _)
    +      .filter(_._2 >= minCount)
    +      .collect()
    +      .distinct
    +      .sortWith(_._2 > _._2)
    +  }
    +
    +  /**
    +   * Generate combination of items by computing on FPTree,
    +   * the computation is done on each FPTree partitions.
    +   */
    +  private def generateCombinations(
    +      data: RDD[Array[String]],
    +      minCount: Double,
    +      singleItem: Array[(String, Int)]): Array[(String, Int)] = {
    +    val single = data.context.broadcast(singleItem)
    +    data.flatMap(basket => createFPTree(basket, single))
    +      .groupByKey()
    +      .flatMap(partition => runFPTree(partition, minCount))
    +      .collect()
    +  }
    +
    +  /**
    +   * Create FP-Tree partition for the giving basket
    +   */
    +  private def createFPTree(
    +      basket: Array[String],
    +      singleItem: Broadcast[Array[(String, Int)]]): Array[(String, Array[String])] = {
    +    var output = ArrayBuffer[(String, Array[String])]()
    +    var combination = ArrayBuffer[String]()
    +    val single = singleItem.value
    +    var items = ArrayBuffer[(String, Int)]()
    +
    +    // Filter the basket by single item pattern
    +    val iterator = basket.iterator
    --- End diff --
    
    We don't need an iterator. The code could be simplified to
    
    ~~~
    val candidates = basket.filter(single.contains).map(item => (item, single(item)))
    ~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273429
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    +    val count = data.count()
    +    val minCount = minSupport * count
    +    val single = generateSingleItem(data, minCount)
    +    val combinations = generateCombinations(data, minCount, single)
    +    new FPGrowthModel(single ++ combinations)
    +  }
    +
    +  /**
    +   * Generate single item pattern by filtering the input data using minimal support level
    +   */
    +  private def generateSingleItem(
    +      data: RDD[Array[String]],
    +      minCount: Double): Array[(String, Int)] = {
    +    data.flatMap(v => v)
    --- End diff --
    
    Are we assuming that the items inside the same basket are distinct? If not, this could be changed to `v => v.toSet`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273440
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    +    val count = data.count()
    +    val minCount = minSupport * count
    +    val single = generateSingleItem(data, minCount)
    +    val combinations = generateCombinations(data, minCount, single)
    +    new FPGrowthModel(single ++ combinations)
    +  }
    +
    +  /**
    +   * Generate single item pattern by filtering the input data using minimal support level
    +   */
    +  private def generateSingleItem(
    +      data: RDD[Array[String]],
    +      minCount: Double): Array[(String, Int)] = {
    +    data.flatMap(v => v)
    +      .map(v => (v, 1))
    +      .reduceByKey(_ + _)
    +      .filter(_._2 >= minCount)
    +      .collect()
    +      .distinct
    +      .sortWith(_._2 > _._2)
    --- End diff --
    
    `.sortBy(-_._2)` may be easier to understand. It is hard to remember which one results descending ordering, `(_._2 > _._2)` or `(_._2 < _._2)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273418
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    --- End diff --
    
    `FPGrowth` -> `FP-growth`, use the same name appeared in the paper.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273422
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    --- End diff --
    
    Is it too high? How do you set the default value?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-72403870
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26486/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273451
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    +    val count = data.count()
    +    val minCount = minSupport * count
    +    val single = generateSingleItem(data, minCount)
    +    val combinations = generateCombinations(data, minCount, single)
    +    new FPGrowthModel(single ++ combinations)
    +  }
    +
    +  /**
    +   * Generate single item pattern by filtering the input data using minimal support level
    +   */
    +  private def generateSingleItem(
    +      data: RDD[Array[String]],
    +      minCount: Double): Array[(String, Int)] = {
    +    data.flatMap(v => v)
    +      .map(v => (v, 1))
    +      .reduceByKey(_ + _)
    +      .filter(_._2 >= minCount)
    +      .collect()
    +      .distinct
    +      .sortWith(_._2 > _._2)
    +  }
    +
    +  /**
    +   * Generate combination of items by computing on FPTree,
    +   * the computation is done on each FPTree partitions.
    +   */
    +  private def generateCombinations(
    +      data: RDD[Array[String]],
    +      minCount: Double,
    +      singleItem: Array[(String, Int)]): Array[(String, Int)] = {
    +    val single = data.context.broadcast(singleItem)
    +    data.flatMap(basket => createFPTree(basket, single))
    +      .groupByKey()
    +      .flatMap(partition => runFPTree(partition, minCount))
    +      .collect()
    +  }
    +
    +  /**
    +   * Create FP-Tree partition for the giving basket
    +   */
    +  private def createFPTree(
    +      basket: Array[String],
    +      singleItem: Broadcast[Array[(String, Int)]]): Array[(String, Array[String])] = {
    +    var output = ArrayBuffer[(String, Array[String])]()
    +    var combination = ArrayBuffer[String]()
    +    val single = singleItem.value
    +    var items = ArrayBuffer[(String, Int)]()
    +
    +    // Filter the basket by single item pattern
    +    val iterator = basket.iterator
    +    while (iterator.hasNext){
    +      val item = iterator.next
    +      val opt = single.find(_._1.equals(item))
    +      if (opt != None) {
    +        items ++= opt
    +      }
    +    }
    +
    +    // Sort it and create the item combinations
    +    val sortedItems = items.sortWith(_._1 > _._1).sortWith(_._2 > _._2).toArray
    +    val itemIterator = sortedItems.iterator
    +    while (itemIterator.hasNext) {
    +      combination.clear()
    +      val item = itemIterator.next
    +      val firstNItems = sortedItems.take(sortedItems.indexOf(item))
    +      if (firstNItems.length > 0) {
    +        val iterator = firstNItems.iterator
    +        while (iterator.hasNext) {
    +          val elem = iterator.next
    +          combination += elem._1
    +        }
    +        output += ((item._1, combination.toArray))
    --- End diff --
    
    I don't see how this implements the Mapper in Algorithm 4 of the PFP paper. The code here doesn't filter the output. For example, if we have `a b c d e f` and `c` and `f` are hashed into the same partition, we only need to send `a b c d e f` but not `a b c` to the same partition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by zhangyouhua2014 <gi...@git.apache.org>.
Github user zhangyouhua2014 commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-71163811
  
    @mengxr . I am working with Jacky together to develop and test this algorithm. I answered this question:
    We refer to the PFP paper, but reduces the process of building the tree, omit this process it can use this time to do other things. Specific steps are as follows:
    1, the transaction database DB is distributed to more than one worker nodes, after two scans transaction database, get conditional pattern sequences.
       1.1, the first scan DB, get a frequent itemsets L1. For example: (a, 6), (b, 5), (c, 3)
       1.2, according to 1.1) was L1 scanning DB again, to filter out non-frequent item, get conditional pattern sequence conditionSEQ. For example: (c, (a, b)), (b, (a)),
       After two scans DB get conditionSEQ, conditionSEQ DB is much smaller than the amount of information.
    2, reduce operations performed using groupByKey operator will conditionSEQ on a machine of the same key into the presence of the same key conditionSEQ worker set on each machine after the merger. The following is based conditionSEQ to mining frequent item sets.
    3, on each worker, using a priori principle of collective operations conditionSEQ find frequent item sets.
    4. Finally, the use of operators collect aggregate results.
      DB algorithm change will spread across multiple worker nodes only need to scan twice to obtain the conditions set pattern sequence conditionSEQ small amount of information in the collection; frequent item set mining is onditionSEQ processed only once reduce, network interaction is small, so fast.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-72231925
  
      [Test build #26406 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26406/consoleFull) for   PR 2847 at commit [`93f3280`](https://github.com/apache/spark/commit/93f3280fb1b9897f40b695683824aef619a5b8c2).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by zhangyouhua2014 <gi...@git.apache.org>.
Github user zhangyouhua2014 commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-71422810
  
    @mengxr 
    1 I mean I use step 1(that Equivalent to create FPTree and condition FPTree ) we have reduce data size and create condition FPTree(only include frequently item not transition data), when using condition FPTree mining frequently item set,it is have a small candidate set.
    2 I have test it and compared mahout pfp,it is a good performance that about 10 time.
    3 afer use groupByKey,ming frequently item set in each node that include Specified key,so it is not network communication overhead.
    4 is there have aggregateByKey operator in new spark version?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273416
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    --- End diff --
    
    order imports into groups: https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports
    
    For a mutable collection, we can import it directly if there is no immutable collection with the same name, e.g., ArrayBuffer. Otherwise, we only import `scala.collection.mutable` and use `mutable.Map` in the code to be explicit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273455
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    +    val count = data.count()
    +    val minCount = minSupport * count
    +    val single = generateSingleItem(data, minCount)
    +    val combinations = generateCombinations(data, minCount, single)
    +    new FPGrowthModel(single ++ combinations)
    +  }
    +
    +  /**
    +   * Generate single item pattern by filtering the input data using minimal support level
    +   */
    +  private def generateSingleItem(
    +      data: RDD[Array[String]],
    +      minCount: Double): Array[(String, Int)] = {
    +    data.flatMap(v => v)
    +      .map(v => (v, 1))
    +      .reduceByKey(_ + _)
    +      .filter(_._2 >= minCount)
    +      .collect()
    +      .distinct
    +      .sortWith(_._2 > _._2)
    +  }
    +
    +  /**
    +   * Generate combination of items by computing on FPTree,
    +   * the computation is done on each FPTree partitions.
    +   */
    +  private def generateCombinations(
    +      data: RDD[Array[String]],
    +      minCount: Double,
    +      singleItem: Array[(String, Int)]): Array[(String, Int)] = {
    +    val single = data.context.broadcast(singleItem)
    +    data.flatMap(basket => createFPTree(basket, single))
    +      .groupByKey()
    +      .flatMap(partition => runFPTree(partition, minCount))
    +      .collect()
    +  }
    +
    +  /**
    +   * Create FP-Tree partition for the giving basket
    +   */
    +  private def createFPTree(
    +      basket: Array[String],
    +      singleItem: Broadcast[Array[(String, Int)]]): Array[(String, Array[String])] = {
    +    var output = ArrayBuffer[(String, Array[String])]()
    +    var combination = ArrayBuffer[String]()
    +    val single = singleItem.value
    +    var items = ArrayBuffer[(String, Int)]()
    +
    +    // Filter the basket by single item pattern
    +    val iterator = basket.iterator
    +    while (iterator.hasNext){
    +      val item = iterator.next
    +      val opt = single.find(_._1.equals(item))
    +      if (opt != None) {
    +        items ++= opt
    +      }
    +    }
    +
    +    // Sort it and create the item combinations
    +    val sortedItems = items.sortWith(_._1 > _._1).sortWith(_._2 > _._2).toArray
    +    val itemIterator = sortedItems.iterator
    +    while (itemIterator.hasNext) {
    +      combination.clear()
    +      val item = itemIterator.next
    +      val firstNItems = sortedItems.take(sortedItems.indexOf(item))
    +      if (firstNItems.length > 0) {
    +        val iterator = firstNItems.iterator
    +        while (iterator.hasNext) {
    +          val elem = iterator.next
    +          combination += elem._1
    +        }
    +        output += ((item._1, combination.toArray))
    +      }
    +    }
    +    output.toArray
    +  }
    +
    +  /**
    +   * Generate frequent pattern by walking through the FPTree
    +   */
    +  private def runFPTree(
    --- End diff --
    
    Can we make it a method inside `object FPGrowth`? It is worth having unit tests for FP-tree building.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori and fp-grow...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-70027659
  
    Had an offline discussion with @jackylk and here is the summary:
    
    1. Keep only the parallel FP-Growth implementation, because it is generally more efficient than Apriori, especially on medium/large datasets. @jackylk can share some performance testing results.
    2. Rename the package "fim" (frequent itemset mining) to "fpm" (frequent pattern mining). There is no standard acronym this family of mining algorithms. Frequent patten mining is a broader term than frequent itemset mining. This package name is also used in Mahout.
    3. Include links to the original FP-Growth paper and the PFP paper in the doc.
    4. Have FPGrowth take `minSupport` at a parameter and implement `run(RDD...): FPGrowthModel`, where FPGrowthModel holds an RDD of frequent itemsets and counts.
    5. Hide methods used internally.
    6. Update code style: a) remove extra empty lines; b) fix indentation; c) change variable names; d) line width; etc.
    7. Check whether we can use generic type for items (for Java API).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori and fp-grow...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-70452833
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25742/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-72244002
  
      [Test build #26407 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26407/consoleFull) for   PR 2847 at commit [`ec21f7d`](https://github.com/apache/spark/commit/ec21f7dfcad6191e0c2d6d7fd93ac77012098e6c).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FPGrowthModel (val frequentPattern: Array[(Array[String], Long)]) extends Serializable `
      * `class FPTree extends Serializable `
      * `class FPTreeNode(val item: String, var count: Int) extends Serializable `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/2847


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-71526153
  
    > 1 I mean I use step 1(that Equivalent to create FPTree and condition FPTree ) we have reduce data size and create condition FPTree(only include frequently item not transition data), when using condition FPTree mining frequently item set,it is have a small candidate set.
    
    The advantage of FP-Growth over Apriori is the tree structure to present candidate set. Both algorithms are taking advantage on the fact that the candidate set is small. I'm asking whether the current implementation uses the tree structure to save communication.
    
    > 2 I have test it and compared mahout pfp,it is a good performance that about 10 time.
    
    I'm not surprised by the 10x speed-up. It is not equivalent to say the current implementation is correct and high-performance. I believe that we can be much faster.
    
    > 3 afer use groupByKey,ming frequently item set in each node that include Specified key,so it is not network communication overhead.
    
    `groupByKey` collects everything to reducers. `aggregateByKey` does part of the aggregation on mappers. There is definitely space for improvement.
    
    > 4 is there have aggregateByKey operator in new spark version?
    
    http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-70465058
  
      [Test build #25752 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25752/consoleFull) for   PR 2847 at commit [`d110ab2`](https://github.com/apache/spark/commit/d110ab2883b0aa85cf0c94f6ca2f12cc0ea73a37).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273426
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    --- End diff --
    
    It is not necessary to have `runAlgorithm`, which is the same as `run`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori and fp-grow...

Posted by erikerlandson <gi...@git.apache.org>.
Github user erikerlandson commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-66129552
  
    As long as itemset mining is under consideration, has anybody tried a Spark implementation of "Logical Itemset Mining":
    http://cvit.iiit.ac.in/papers/Chandrashekar2012Logical.pdf



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273448
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    +    val count = data.count()
    +    val minCount = minSupport * count
    +    val single = generateSingleItem(data, minCount)
    +    val combinations = generateCombinations(data, minCount, single)
    +    new FPGrowthModel(single ++ combinations)
    +  }
    +
    +  /**
    +   * Generate single item pattern by filtering the input data using minimal support level
    +   */
    +  private def generateSingleItem(
    +      data: RDD[Array[String]],
    +      minCount: Double): Array[(String, Int)] = {
    +    data.flatMap(v => v)
    +      .map(v => (v, 1))
    +      .reduceByKey(_ + _)
    +      .filter(_._2 >= minCount)
    +      .collect()
    +      .distinct
    +      .sortWith(_._2 > _._2)
    +  }
    +
    +  /**
    +   * Generate combination of items by computing on FPTree,
    +   * the computation is done on each FPTree partitions.
    +   */
    +  private def generateCombinations(
    +      data: RDD[Array[String]],
    +      minCount: Double,
    +      singleItem: Array[(String, Int)]): Array[(String, Int)] = {
    +    val single = data.context.broadcast(singleItem)
    +    data.flatMap(basket => createFPTree(basket, single))
    +      .groupByKey()
    +      .flatMap(partition => runFPTree(partition, minCount))
    +      .collect()
    +  }
    +
    +  /**
    +   * Create FP-Tree partition for the giving basket
    +   */
    +  private def createFPTree(
    +      basket: Array[String],
    +      singleItem: Broadcast[Array[(String, Int)]]): Array[(String, Array[String])] = {
    +    var output = ArrayBuffer[(String, Array[String])]()
    +    var combination = ArrayBuffer[String]()
    +    val single = singleItem.value
    +    var items = ArrayBuffer[(String, Int)]()
    +
    +    // Filter the basket by single item pattern
    +    val iterator = basket.iterator
    +    while (iterator.hasNext){
    +      val item = iterator.next
    +      val opt = single.find(_._1.equals(item))
    --- End diff --
    
    If we need to test membership, should we make `single` a map?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori and fp-grow...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-70041264
  
      [Test build #25596 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25596/consoleFull) for   PR 2847 at commit [`7b77ad7`](https://github.com/apache/spark/commit/7b77ad74a7de9af8e720f37430ce4b75651298be).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-72232101
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26406/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori and fp-grow...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-70450297
  
      [Test build #25742 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25742/consoleFull) for   PR 2847 at commit [`eb3e4ca`](https://github.com/apache/spark/commit/eb3e4ca0709696b6b2b8afd1cfc56a5a9f87555d).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-72401147
  
      [Test build #26486 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26486/consoleFull) for   PR 2847 at commit [`bee3093`](https://github.com/apache/spark/commit/bee3093daa4c8473a9f531c5fdee353c06cd1bf0).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-72301170
  
    I have not tested performance yet. I will test it at weekend


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori and fp-grow...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-70452828
  
      [Test build #25742 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25742/consoleFull) for   PR 2847 at commit [`eb3e4ca`](https://github.com/apache/spark/commit/eb3e4ca0709696b6b2b8afd1cfc56a5a9f87555d).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-72232096
  
      [Test build #26406 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26406/consoleFull) for   PR 2847 at commit [`93f3280`](https://github.com/apache/spark/commit/93f3280fb1b9897f40b695683824aef619a5b8c2).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FPGrowthModel (val frequentPattern: Array[(Array[String], Long)]) extends Serializable `
      * `class FPTree extends Serializable `
      * `class FPTreeNode(val item: String, var count: Int) extends Serializable `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori algorithm f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-60014041
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori and fp-grow...

Posted by denmoroz <gi...@git.apache.org>.
Github user denmoroz commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-66125004
  
    Maybe it is better to use RDD[BitSet] as transactions RDD? Then you can add a preprocessor trait and make any transformations for source RDD to RDD of BitSets. For example, transformation of RDD[Array[String]] to RDD[BitSet].
    It seems to me, that BitSet is the much better idea of transactions representation then Array[String] or Array[Int] or anything else.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori and fp-grow...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-70450975
  
    Yes, I have tested the parallel FP-Growth algorithm using a open data set from http://fimi.ua.ac.be/data/, performance test result can be found at https://issues.apache.org/jira/browse/SPARK-4001
    
    All modification is done except for the 7th (generic type), please review the code for now.
    I am still considering whether it is worthy to implement generic type since it adds more complexity to the code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-71765551
  
    Had an offline discussion with @jackylk and @zhangyouhua2014 . We plan to add a utility class named `FPTree` with the following:
    
    ~~~
    def add(transaction: Array[String]): this.type
    
    def merge(tree: FPTree): this.type
    
    def extract(threshold: Int, validateSuffix: String => Boolean): Iterator[Array[String]]
    ~~~
    
    and the use `aggregateByKey` to grow trees in parallel.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori algorithm f...

Posted by varadharajan <gi...@git.apache.org>.
Github user varadharajan commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-62263230
  
    As mentioned in comments of SPARK-2432. I was wondering how the PFP versions compared with YAFIM (http://pasa-bigdata.nju.edu.cn/people/ronggu/pub/YAFIM_ParLearning.pdf). Probably i will do a bit more reading on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273453
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    +    val count = data.count()
    +    val minCount = minSupport * count
    +    val single = generateSingleItem(data, minCount)
    +    val combinations = generateCombinations(data, minCount, single)
    +    new FPGrowthModel(single ++ combinations)
    +  }
    +
    +  /**
    +   * Generate single item pattern by filtering the input data using minimal support level
    +   */
    +  private def generateSingleItem(
    +      data: RDD[Array[String]],
    +      minCount: Double): Array[(String, Int)] = {
    +    data.flatMap(v => v)
    +      .map(v => (v, 1))
    +      .reduceByKey(_ + _)
    +      .filter(_._2 >= minCount)
    +      .collect()
    +      .distinct
    +      .sortWith(_._2 > _._2)
    +  }
    +
    +  /**
    +   * Generate combination of items by computing on FPTree,
    +   * the computation is done on each FPTree partitions.
    +   */
    +  private def generateCombinations(
    +      data: RDD[Array[String]],
    +      minCount: Double,
    +      singleItem: Array[(String, Int)]): Array[(String, Int)] = {
    +    val single = data.context.broadcast(singleItem)
    +    data.flatMap(basket => createFPTree(basket, single))
    +      .groupByKey()
    +      .flatMap(partition => runFPTree(partition, minCount))
    +      .collect()
    +  }
    +
    +  /**
    +   * Create FP-Tree partition for the giving basket
    +   */
    +  private def createFPTree(
    +      basket: Array[String],
    +      singleItem: Broadcast[Array[(String, Int)]]): Array[(String, Array[String])] = {
    +    var output = ArrayBuffer[(String, Array[String])]()
    +    var combination = ArrayBuffer[String]()
    +    val single = singleItem.value
    +    var items = ArrayBuffer[(String, Int)]()
    +
    +    // Filter the basket by single item pattern
    +    val iterator = basket.iterator
    +    while (iterator.hasNext){
    +      val item = iterator.next
    +      val opt = single.find(_._1.equals(item))
    +      if (opt != None) {
    +        items ++= opt
    +      }
    +    }
    +
    +    // Sort it and create the item combinations
    +    val sortedItems = items.sortWith(_._1 > _._1).sortWith(_._2 > _._2).toArray
    +    val itemIterator = sortedItems.iterator
    +    while (itemIterator.hasNext) {
    +      combination.clear()
    +      val item = itemIterator.next
    +      val firstNItems = sortedItems.take(sortedItems.indexOf(item))
    +      if (firstNItems.length > 0) {
    +        val iterator = firstNItems.iterator
    +        while (iterator.hasNext) {
    +          val elem = iterator.next
    +          combination += elem._1
    +        }
    +        output += ((item._1, combination.toArray))
    +      }
    +    }
    +    output.toArray
    +  }
    +
    +  /**
    +   * Generate frequent pattern by walking through the FPTree
    --- End diff --
    
    Missing doc on the input args and the return value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori algorithm f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-59644841
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori and fp-grow...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-70041330
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25596/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-72244009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26407/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273445
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    +    val count = data.count()
    +    val minCount = minSupport * count
    +    val single = generateSingleItem(data, minCount)
    +    val combinations = generateCombinations(data, minCount, single)
    +    new FPGrowthModel(single ++ combinations)
    +  }
    +
    +  /**
    +   * Generate single item pattern by filtering the input data using minimal support level
    +   */
    +  private def generateSingleItem(
    +      data: RDD[Array[String]],
    +      minCount: Double): Array[(String, Int)] = {
    +    data.flatMap(v => v)
    +      .map(v => (v, 1))
    +      .reduceByKey(_ + _)
    +      .filter(_._2 >= minCount)
    +      .collect()
    +      .distinct
    +      .sortWith(_._2 > _._2)
    +  }
    +
    +  /**
    +   * Generate combination of items by computing on FPTree,
    +   * the computation is done on each FPTree partitions.
    +   */
    +  private def generateCombinations(
    +      data: RDD[Array[String]],
    +      minCount: Double,
    +      singleItem: Array[(String, Int)]): Array[(String, Int)] = {
    +    val single = data.context.broadcast(singleItem)
    +    data.flatMap(basket => createFPTree(basket, single))
    +      .groupByKey()
    +      .flatMap(partition => runFPTree(partition, minCount))
    +      .collect()
    +  }
    +
    +  /**
    +   * Create FP-Tree partition for the giving basket
    --- End diff --
    
    Same here: missing docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-70773748
  
    @jackylk I made a brief scan of the implementation. Besides inline comments, I have some high-level suggestions:
    
    1) It would be good if we can make the code easier for someone with the paper in hand. I don't think the notation is good in the PFP paper. But when we create a variable, we should put a comment mentioning the corresponding variable name in the paper.
    2) Maybe for reason 1), I cannot find an exact match between the implementation and the algorithm described in the paper. It seems that you implemented `Figure 1` of the paper but it is not the PFP algorithm. Could you double check?
    3) The tree building code needs some unit tests. So it is easy to convince reviewers that the implementation is correct. And if using trees can compress the data, we should use aggregateByKey instead of groupByKey.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-72232107
  
    @mengxr 
    I have modified according to the comments, please review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-71420350
  
    @zhangyouhua2014 
    
    > We refer to the PFP paper, but reduces the process of building the tree, omit this process it can use > this time to do other things.
    
    By "reduce", did you mean skipping the process of growing trees? The FP-Growth algorithm reduces memory requirement using the tree representation of candidate sets. If we skip this step, it is hard to call it `FPGrowth`. Did you do any performance comparison between your version and the PFP implementation?
    
    > 2, reduce operations performed using groupByKey operator will conditionSEQ on a machine of the same key into the presence of the same key conditionSEQ worker set on each machine after the merger. The following is based conditionSEQ to mining frequent item sets.
    
    It is important to grow the tree on the mapper side to save communication cost. `groupByKey` doesn't do that. I was suggesting using `aggregateByKey`. For each key, we start with an empty tree, with `seqOp` growing the tree and `combOp` merging two trees. Besides, the partition key is the hash value of the last item of the sequence. We should be able to reduce communication cost (see my inline comments at L135).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-70465070
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25752/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-70458292
  
      [Test build #25752 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25752/consoleFull) for   PR 2847 at commit [`d110ab2`](https://github.com/apache/spark/commit/d110ab2883b0aa85cf0c94f6ca2f12cc0ea73a37).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273412
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    --- End diff --
    
    This is no longer needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273427
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    +    val count = data.count()
    +    val minCount = minSupport * count
    +    val single = generateSingleItem(data, minCount)
    +    val combinations = generateCombinations(data, minCount, single)
    +    new FPGrowthModel(single ++ combinations)
    +  }
    +
    +  /**
    +   * Generate single item pattern by filtering the input data using minimal support level
    --- End diff --
    
    Should document at least the return value. It would be best document the input arguments as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273450
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    +    val count = data.count()
    +    val minCount = minSupport * count
    +    val single = generateSingleItem(data, minCount)
    +    val combinations = generateCombinations(data, minCount, single)
    +    new FPGrowthModel(single ++ combinations)
    +  }
    +
    +  /**
    +   * Generate single item pattern by filtering the input data using minimal support level
    +   */
    +  private def generateSingleItem(
    +      data: RDD[Array[String]],
    +      minCount: Double): Array[(String, Int)] = {
    +    data.flatMap(v => v)
    +      .map(v => (v, 1))
    +      .reduceByKey(_ + _)
    +      .filter(_._2 >= minCount)
    +      .collect()
    +      .distinct
    +      .sortWith(_._2 > _._2)
    +  }
    +
    +  /**
    +   * Generate combination of items by computing on FPTree,
    +   * the computation is done on each FPTree partitions.
    +   */
    +  private def generateCombinations(
    +      data: RDD[Array[String]],
    +      minCount: Double,
    +      singleItem: Array[(String, Int)]): Array[(String, Int)] = {
    +    val single = data.context.broadcast(singleItem)
    +    data.flatMap(basket => createFPTree(basket, single))
    +      .groupByKey()
    +      .flatMap(partition => runFPTree(partition, minCount))
    +      .collect()
    +  }
    +
    +  /**
    +   * Create FP-Tree partition for the giving basket
    +   */
    +  private def createFPTree(
    +      basket: Array[String],
    +      singleItem: Broadcast[Array[(String, Int)]]): Array[(String, Array[String])] = {
    +    var output = ArrayBuffer[(String, Array[String])]()
    +    var combination = ArrayBuffer[String]()
    +    val single = singleItem.value
    +    var items = ArrayBuffer[(String, Int)]()
    +
    +    // Filter the basket by single item pattern
    +    val iterator = basket.iterator
    +    while (iterator.hasNext){
    +      val item = iterator.next
    +      val opt = single.find(_._1.equals(item))
    +      if (opt != None) {
    +        items ++= opt
    +      }
    +    }
    +
    +    // Sort it and create the item combinations
    +    val sortedItems = items.sortWith(_._1 > _._1).sortWith(_._2 > _._2).toArray
    --- End diff --
    
    Why sorting twice? The second will overwrite the first. Besides, using `sortBy(-_._2)` would be better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding apriori and fp-grow...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-70041328
  
      [Test build #25596 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25596/consoleFull) for   PR 2847 at commit [`7b77ad7`](https://github.com/apache/spark/commit/7b77ad74a7de9af8e720f37430ce4b75651298be).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273414
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    --- End diff --
    
    try to be explicit on imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2847#issuecomment-72246222
  
    @jackylk Thanks for the update! Did you see any performance improvement on your dataset with `aggregateByKey`? I'm quite interested in how much shuffle we can save (hopefully) on a real dataset.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273446
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    +    val count = data.count()
    +    val minCount = minSupport * count
    +    val single = generateSingleItem(data, minCount)
    +    val combinations = generateCombinations(data, minCount, single)
    +    new FPGrowthModel(single ++ combinations)
    +  }
    +
    +  /**
    +   * Generate single item pattern by filtering the input data using minimal support level
    +   */
    +  private def generateSingleItem(
    +      data: RDD[Array[String]],
    +      minCount: Double): Array[(String, Int)] = {
    +    data.flatMap(v => v)
    +      .map(v => (v, 1))
    +      .reduceByKey(_ + _)
    +      .filter(_._2 >= minCount)
    +      .collect()
    +      .distinct
    +      .sortWith(_._2 > _._2)
    +  }
    +
    +  /**
    +   * Generate combination of items by computing on FPTree,
    +   * the computation is done on each FPTree partitions.
    +   */
    +  private def generateCombinations(
    +      data: RDD[Array[String]],
    +      minCount: Double,
    +      singleItem: Array[(String, Int)]): Array[(String, Int)] = {
    +    val single = data.context.broadcast(singleItem)
    +    data.flatMap(basket => createFPTree(basket, single))
    +      .groupByKey()
    +      .flatMap(partition => runFPTree(partition, minCount))
    +      .collect()
    +  }
    +
    +  /**
    +   * Create FP-Tree partition for the giving basket
    +   */
    +  private def createFPTree(
    +      basket: Array[String],
    +      singleItem: Broadcast[Array[(String, Int)]]): Array[(String, Array[String])] = {
    --- End diff --
    
    You can call `single.value` in `generateCombinations` and remove `Broadcast[..]` for this function. This produces better code separation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4001][MLlib] adding parallel FP-Growth ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2847#discussion_r23273434
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.fpm
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast._
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.{ArrayBuffer, Map}
    +
    +/**
    + * This class implements Parallel FPGrowth algorithm to do frequent pattern matching on input data.
    + * Parallel FPGrowth (PFP) partitions computation in such a way that each machine executes an
    + * independent group of mining tasks. More detail of this algorithm can be found at
    + * http://infolab.stanford.edu/~echang/recsys08-69.pdf
    + */
    +class FPGrowth private(private var minSupport: Double) extends Logging with Serializable {
    +
    +  /**
    +   * Constructs a FPGrowth instance with default parameters:
    +   * {minSupport: 0.5}
    +   */
    +  def this() = this(0.5)
    +
    +  /**
    +   * set the minimal support level, default is 0.5
    +   * @param minSupport minimal support level
    +   */
    +  def setMinSupport(minSupport: Double): this.type = {
    +    this.minSupport = minSupport
    +    this
    +  }
    +
    +  /**
    +   * Compute a FPGrowth Model that contains frequent pattern result.
    +   * @param data input data set
    +   * @return FPGrowth Model
    +   */
    +  def run(data: RDD[Array[String]]): FPGrowthModel = {
    +    val model = runAlgorithm(data)
    +    model
    +  }
    +
    +  /**
    +   * Implementation of PFP.
    +   */
    +  private def runAlgorithm(data: RDD[Array[String]]): FPGrowthModel = {
    +    val count = data.count()
    +    val minCount = minSupport * count
    +    val single = generateSingleItem(data, minCount)
    +    val combinations = generateCombinations(data, minCount, single)
    +    new FPGrowthModel(single ++ combinations)
    +  }
    +
    +  /**
    +   * Generate single item pattern by filtering the input data using minimal support level
    +   */
    +  private def generateSingleItem(
    +      data: RDD[Array[String]],
    +      minCount: Double): Array[(String, Int)] = {
    +    data.flatMap(v => v)
    +      .map(v => (v, 1))
    --- End diff --
    
    `1` -> `1L`, if we do need to handle billions of items?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org