You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jkbradley <gi...@git.apache.org> on 2018/04/03 23:55:20 UTC

[GitHub] spark pull request #15770: [SPARK-15784][ML]:Add Power Iteration Clustering ...

Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15770#discussion_r178983843
  
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.ml.clustering
    +
    +import org.apache.spark.annotation.{Experimental, Since}
    +import org.apache.spark.ml.Transformer
    +import org.apache.spark.ml.linalg.Vector
    +import org.apache.spark.ml.param._
    +import org.apache.spark.ml.param.shared._
    +import org.apache.spark.ml.util._
    +import org.apache.spark.mllib.clustering.{PowerIterationClustering => MLlibPowerIterationClustering}
    +import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.{DataFrame, Dataset, Row}
    +import org.apache.spark.sql.functions.col
    +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}
    +
    +/**
    + * Common params for PowerIterationClustering
    + */
    +private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter
    +  with HasFeaturesCol with HasPredictionCol with HasWeightCol {
    +
    +  /**
    +   * The number of clusters to create (k). Must be &gt; 1. Default: 2.
    +   * @group param
    +   */
    +  @Since("2.3.0")
    +  final val k = new IntParam(this, "k", "The number of clusters to create. " +
    +    "Must be > 1.", ParamValidators.gt(1))
    +
    +  /** @group getParam */
    +  @Since("2.3.0")
    +  def getK: Int = $(k)
    +
    +  /**
    +   * Param for the initialization algorithm. This can be either "random" to use a random vector
    +   * as vertex properties, or "degree" to use normalized sum similarities. Default: random.
    +   */
    +  @Since("2.3.0")
    +  final val initMode = {
    +    val allowedParams = ParamValidators.inArray(Array("random", "degree"))
    +    new Param[String](this, "initMode", "The initialization algorithm. " +
    +      "Supported options: 'random' and 'degree'.", allowedParams)
    +  }
    +
    +  /** @group expertGetParam */
    +  @Since("2.3.0")
    +  def getInitMode: String = $(initMode)
    +
    +  /**
    +   * Param for the column name for ids returned by PowerIterationClustering.transform().
    +   * Default: "id"
    +   * @group param
    +   */
    +  @Since("2.3.0")
    +  val idCol = new Param[String](this, "id", "column name for ids.")
    +
    +  /** @group getParam */
    +  @Since("2.3.0")
    +  def getIdCol: String = $(idCol)
    +
    +  /**
    +   * Param for the column name for neighbors required by PowerIterationClustering.transform().
    +   * Default: "neighbor"
    +   * @group param
    +   */
    +  @Since("2.3.0")
    +  val neighborCol = new Param[String](this, "neighbor", "column name for neighbors.")
    +
    +  /** @group getParam */
    +  @Since("2.3.0")
    +  def getNeighborCol: String = $(neighborCol)
    +
    +  /**
    +   * Validates the input schema
    +   * @param schema input schema
    +   */
    +  protected def validateSchema(schema: StructType): Unit = {
    +    SchemaUtils.checkColumnType(schema, $(idCol), LongType)
    +    SchemaUtils.checkColumnType(schema, $(predictionCol), IntegerType)
    +  }
    +}
    +
    +/**
    + * :: Experimental ::
    + * Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by
    + * <a href=http://www.icml2010.org/papers/387.pdf>Lin and Cohen</a>. From the abstract:
    + * PIC finds a very low-dimensional embedding of a dataset using truncated power
    + * iteration on a normalized pair-wise similarity matrix of the data.
    + *
    + * Note that we implement [[PowerIterationClustering]] as a transformer. The [[transform]] is an
    + * expensive operation, because it uses PIC algorithm to cluster the whole input dataset.
    + *
    + * @see <a href=http://en.wikipedia.org/wiki/Spectral_clustering>
    + * Spectral clustering (Wikipedia)</a>
    + */
    +@Since("2.3.0")
    +@Experimental
    +class PowerIterationClustering private[clustering] (
    +    @Since("2.3.0") override val uid: String)
    +  extends Transformer with PowerIterationClusteringParams with DefaultParamsWritable {
    +
    +  setDefault(
    +    k -> 2,
    +    maxIter -> 20,
    +    initMode -> "random",
    +    idCol -> "id",
    +    weightCol -> "weight",
    +    neighborCol -> "neighbor")
    +
    +  @Since("2.3.0")
    +  override def copy(extra: ParamMap): PowerIterationClustering = defaultCopy(extra)
    +
    +  @Since("2.3.0")
    +  def this() = this(Identifiable.randomUID("PowerIterationClustering"))
    +
    +  /** @group setParam */
    +  @Since("2.3.0")
    +  def setFeaturesCol(value: String): this.type = set(featuresCol, value)
    +
    +  /** @group setParam */
    +  @Since("2.3.0")
    +  def setPredictionCol(value: String): this.type = set(predictionCol, value)
    +
    +  /** @group setParam */
    +  @Since("2.3.0")
    +  def setK(value: Int): this.type = set(k, value)
    +
    +  /** @group expertSetParam */
    +  @Since("2.3.0")
    +  def setInitMode(value: String): this.type = set(initMode, value)
    +
    +  /** @group setParam */
    +  @Since("2.3.0")
    +  def setMaxIter(value: Int): this.type = set(maxIter, value)
    +
    +  /** @group setParam */
    +  @Since("2.3.0")
    +  def setIdCol(value: String): this.type = set(idCol, value)
    +
    +  /**
    +   * Sets the value of param [[weightCol]].
    +   * Default is "weight"
    +   *
    +   * @group setParam
    +   */
    +  @Since("2.3.0")
    +  def setWeightCol(value: String): this.type = set(weightCol, value)
    +
    +  /**
    +   * Sets the value of param [[neighborCol]].
    +   * Default is "neighbor"
    +   *
    +   * @group setParam
    +   */
    +  @Since("2.3.0")
    +  def setNeighborCol(value: String): this.type = set(neighborCol, value)
    +
    +  @Since("2.3.0")
    +  override def transform(dataset: Dataset[_]): DataFrame = {
    +    val sparkSession = dataset.sparkSession
    +    val rdd: RDD[(Long, Long, Double)] =
    +      dataset.select(col($(idCol)), col($(neighborCol)), col($(weightCol))).rdd.flatMap {
    +        case Row(id: Long, nbr: Vector, weight: Vector) =>
    --- End diff --
    
    I agree about not checking for symmetry as long as we document it.
    
    But I do have one suggestion: Let's take neighbors and weights as Arrays, not Vectors.  That may help prevent users from mistakenly passing in feature Vectors.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org