You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by sethah <gi...@git.apache.org> on 2017/12/12 14:58:40 UTC

[GitHub] spark pull request #19876: [WIP][ML][SPARK-11171][SPARK-11239] Add PMML expo...

Github user sethah commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19876#discussion_r155157785
  
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala ---
    @@ -126,15 +180,69 @@ abstract class MLWriter extends BaseReadWrite with Logging {
         this
       }
     
    +  // override for Java compatibility
    +  override def session(sparkSession: SparkSession): this.type = super.session(sparkSession)
    +
    +  // override for Java compatibility
    +  override def context(sqlContext: SQLContext): this.type = super.session(sqlContext.sparkSession)
    +}
    +
    +/**
    + * A ML Writer which delegates based on the requested format.
    + */
    +class GeneralMLWriter(stage: PipelineStage) extends MLWriter with Logging {
    +  private var source: String = "internal"
    +
       /**
    -   * Overwrites if the output path already exists.
    +   * Specifies the format of ML export (e.g. PMML, internal, or
    +   * the fully qualified class name for export).
        */
    -  @Since("1.6.0")
    -  def overwrite(): this.type = {
    -    shouldOverwrite = true
    +  @Since("2.3.0")
    +  def format(source: String): this.type = {
    +    this.source = source
         this
       }
     
    +  /**
    +   * Dispatches the save to the correct MLFormat.
    +   */
    +  @Since("2.3.0")
    +  @throws[IOException]("If the input path already exists but overwrite is not enabled.")
    +  @throws[SparkException]("If multiple sources for a given short name format are found.")
    +  override protected def saveImpl(path: String) = {
    +    val loader = Utils.getContextOrSparkClassLoader
    +    val serviceLoader = ServiceLoader.load(classOf[MLFormatRegister], loader)
    +    val stageName = stage.getClass.getName
    +    val targetName = s"${source}+${stageName}"
    +    val formats = serviceLoader.asScala.toList
    +    val shortNames = formats.map(_.shortName())
    +    val writerCls = formats.filter(_.shortName().equalsIgnoreCase(targetName)) match {
    +      // requested name did not match any given registered alias
    +      case Nil =>
    +        Try(loader.loadClass(source)) match {
    +          case Success(writer) =>
    +            // Found the ML writer using the fully qualified path
    +            writer
    +          case Failure(error) =>
    +            throw new SparkException(
    +              s"Could not load requested format $source for $stageName ($targetName) had $formats" +
    +              s"supporting $shortNames", error)
    +        }
    +      case head :: Nil =>
    +        head.getClass
    +      case _ =>
    +        // Multiple sources
    +        throw new SparkException(
    +          s"Multiple writers found for $source+$stageName, try using the class name of the writer")
    +    }
    +    if (classOf[MLWriterFormat].isAssignableFrom(writerCls)) {
    +      val writer = writerCls.newInstance().asInstanceOf[MLWriterFormat]
    +      writer.write(path, sparkSession, optionMap, stage)
    +    } else {
    +      throw new SparkException("ML source $source is not a valid MLWriterFormat")
    --- End diff --
    
    nit: need string interpolation here


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org