You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by sethah <gi...@git.apache.org> on 2017/12/12 14:58:40 UTC
[GitHub] spark pull request #19876: [WIP][ML][SPARK-11171][SPARK-11239] Add PMML expo...
Github user sethah commented on a diff in the pull request:
https://github.com/apache/spark/pull/19876#discussion_r155157785
--- Diff: mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala ---
@@ -126,15 +180,69 @@ abstract class MLWriter extends BaseReadWrite with Logging {
this
}
+ // override for Java compatibility
+ override def session(sparkSession: SparkSession): this.type = super.session(sparkSession)
+
+ // override for Java compatibility
+ override def context(sqlContext: SQLContext): this.type = super.session(sqlContext.sparkSession)
+}
+
+/**
+ * A ML Writer which delegates based on the requested format.
+ */
+class GeneralMLWriter(stage: PipelineStage) extends MLWriter with Logging {
+ private var source: String = "internal"
+
/**
- * Overwrites if the output path already exists.
+ * Specifies the format of ML export (e.g. PMML, internal, or
+ * the fully qualified class name for export).
*/
- @Since("1.6.0")
- def overwrite(): this.type = {
- shouldOverwrite = true
+ @Since("2.3.0")
+ def format(source: String): this.type = {
+ this.source = source
this
}
+ /**
+ * Dispatches the save to the correct MLFormat.
+ */
+ @Since("2.3.0")
+ @throws[IOException]("If the input path already exists but overwrite is not enabled.")
+ @throws[SparkException]("If multiple sources for a given short name format are found.")
+ override protected def saveImpl(path: String) = {
+ val loader = Utils.getContextOrSparkClassLoader
+ val serviceLoader = ServiceLoader.load(classOf[MLFormatRegister], loader)
+ val stageName = stage.getClass.getName
+ val targetName = s"${source}+${stageName}"
+ val formats = serviceLoader.asScala.toList
+ val shortNames = formats.map(_.shortName())
+ val writerCls = formats.filter(_.shortName().equalsIgnoreCase(targetName)) match {
+ // requested name did not match any given registered alias
+ case Nil =>
+ Try(loader.loadClass(source)) match {
+ case Success(writer) =>
+ // Found the ML writer using the fully qualified path
+ writer
+ case Failure(error) =>
+ throw new SparkException(
+ s"Could not load requested format $source for $stageName ($targetName) had $formats" +
+ s"supporting $shortNames", error)
+ }
+ case head :: Nil =>
+ head.getClass
+ case _ =>
+ // Multiple sources
+ throw new SparkException(
+ s"Multiple writers found for $source+$stageName, try using the class name of the writer")
+ }
+ if (classOf[MLWriterFormat].isAssignableFrom(writerCls)) {
+ val writer = writerCls.newInstance().asInstanceOf[MLWriterFormat]
+ writer.write(path, sparkSession, optionMap, stage)
+ } else {
+ throw new SparkException("ML source $source is not a valid MLWriterFormat")
--- End diff --
nit: need string interpolation here
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org