You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@predictionio.apache.org by mars <gi...@git.apache.org> on 2017/07/15 00:06:22 UTC

[GitHub] incubator-predictionio pull request #412: Batch Predictions

GitHub user mars opened a pull request:

    https://github.com/apache/incubator-predictionio/pull/412

    Batch Predictions

    JIRA issue [PIO-105](https://issues.apache.org/jira/browse/PIO-105)
    
    Provides a new `pio batchpredict` command.
    
    Reads from multi-object JSON input file. Example:
    
    ```json
    {"user":"1"}
    {"user":"2"}
    {"user":"3"}
    {"user":"4"}
    {"user":"5"}
    ```
    
    Writes to multi-object JSON output file (actually Hadoop partition files). Example:
    
    ```json
    {"query":{"user":"1"},"prediction":{"itemScores":[{"item":"1","score":33},{"item":"2","score":32}]}}
    {"query":{"user":"2"},"prediction":{"itemScores":[{"item":"5","score":55},{"item":"3","score":28}]}}
    {"query":{"user":"3"},"prediction":{"itemScores":[{"item":"2","score":16},{"item":"3","score":12}]}}
    {"query":{"user":"4"},"prediction":{"itemScores":[{"item":"3","score":19},{"item":"1","score":18}]}}
    {"query":{"user":"5"},"prediction":{"itemScores":[{"item":"1","score":24},{"item":"4","score":14}]}}
    ```
    
    See the included [console usage help](#diff-2cf174557564e09d52157be8e839fecf)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mars/incubator-predictionio batch-predict

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-predictionio/pull/412.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #412
    
----
commit 99ee6493bddc8f02aee384f3a2db27c6ae3f68cc
Author: Mars Hall <ma...@heroku.com>
Date:   2017-07-13T00:12:25Z

    Implement BatchPredict

commit c205357498e4a4a745810b04130c5bbad78f8686
Author: Mars Hall <ma...@heroku.com>
Date:   2017-07-14T22:29:26Z

    Improve console help for batch predict.

commit 93f7ed3e5ed10155a688a032e367793d75fa116a
Author: Mars Hall <ma...@heroku.com>
Date:   2017-07-14T22:46:30Z

    Undo experimental change to publish tools artifact

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-predictionio issue #412: [PIO-105] Batch Predictions

Posted by takezoe <gi...@git.apache.org>.
Github user takezoe commented on the issue:

    https://github.com/apache/incubator-predictionio/pull/412
  
    @mars These changes should work as same as original code completely, but difference of code appearance might cause confusion. So these kind of changes might have to be done at once (in an another pull request).
    
    I think it's a reasonable decision that even if you don't apply these changes in this pull request. 👍 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-predictionio issue #412: [PIO-105] Batch Predictions

Posted by dszeto <gi...@git.apache.org>.
Github user dszeto commented on the issue:

    https://github.com/apache/incubator-predictionio/pull/412
  
    Created https://issues.apache.org/jira/browse/PIO-110 and https://issues.apache.org/jira/browse/PIO-111 as follow-ups. Thanks @mars for the feature and @takezoe for the feedback!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-predictionio pull request #412: [PIO-105] Batch Predictions

Posted by takezoe <gi...@git.apache.org>.
Github user takezoe commented on a diff in the pull request:

    https://github.com/apache/incubator-predictionio/pull/412#discussion_r127853016
  
    --- Diff: core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.predictionio.workflow
    +
    +import java.io.Serializable
    +
    +import com.twitter.bijection.Injection
    +import com.twitter.chill.{KryoBase, KryoInjection, ScalaKryoInstantiator}
    +import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer
    +import grizzled.slf4j.Logging
    +import org.apache.predictionio.controller.{Engine, Utils}
    +import org.apache.predictionio.core.{BaseAlgorithm, BaseServing, Doer}
    +import org.apache.predictionio.data.storage.{EngineInstance, Storage}
    +import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
    +import org.apache.spark.rdd.RDD
    +import org.json4s._
    +import org.json4s.native.JsonMethods._
    +import scala.language.existentials
    +
    +case class BatchPredictConfig(
    +  inputFilePath: String = "batchpredict-input.json",
    +  outputFilePath: String = "batchpredict-output.json",
    +  queryPartitions: Option[Int] = None,
    +  engineInstanceId: String = "",
    +  engineId: Option[String] = None,
    +  engineVersion: Option[String] = None,
    +  engineVariant: String = "",
    +  env: Option[String] = None,
    +  verbose: Boolean = false,
    +  debug: Boolean = false,
    +  jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both)
    +
    +object BatchPredict extends Logging {
    +
    +  class KryoInstantiator(classLoader: ClassLoader) extends ScalaKryoInstantiator {
    +    override def newKryo(): KryoBase = {
    +      val kryo = super.newKryo()
    +      kryo.setClassLoader(classLoader)
    +      SynchronizedCollectionsSerializer.registerSerializers(kryo)
    +      kryo
    +    }
    +  }
    +
    +  object KryoInstantiator extends Serializable {
    +    def newKryoInjection : Injection[Any, Array[Byte]] = {
    +      val kryoInstantiator = new KryoInstantiator(getClass.getClassLoader)
    +      KryoInjection.instance(kryoInstantiator)
    +    }
    +  }
    +
    +  val engineInstances = Storage.getMetaDataEngineInstances
    +  val modeldata = Storage.getModelDataModels
    +
    +  def main(args: Array[String]): Unit = {
    +    val parser = new scopt.OptionParser[BatchPredictConfig]("BatchPredict") {
    +      opt[String]("input") action { (x, c) =>
    +        c.copy(inputFilePath = x)
    +      } text("Path to file containing input queries; a " +
    +        "multi-object JSON file with one object per line.")
    +      opt[String]("output") action { (x, c) =>
    +        c.copy(outputFilePath = x)
    +      } text("Path to file containing output predictions; a " +
    +        "multi-object JSON file with one object per line.")
    +      opt[Int]("query-partitions") action { (x, c) =>
    +        c.copy(queryPartitions = Some(x))
    +      } text("Limit concurrency of predictions by setting the number " +
    +        "of partitions used internally for the RDD of queries.")
    +      opt[String]("engineId") action { (x, c) =>
    +        c.copy(engineId = Some(x))
    +      } text("Engine ID.")
    +      opt[String]("engineId") action { (x, c) =>
    +        c.copy(engineId = Some(x))
    +      } text("Engine ID.")
    +      opt[String]("engineVersion") action { (x, c) =>
    +        c.copy(engineVersion = Some(x))
    +      } text("Engine version.")
    +      opt[String]("engine-variant") required() action { (x, c) =>
    +        c.copy(engineVariant = x)
    +      } text("Engine variant JSON.")
    +      opt[String]("env") action { (x, c) =>
    +        c.copy(env = Some(x))
    +      } text("Comma-separated list of environmental variables (in 'FOO=BAR' " +
    +        "format) to pass to the Spark execution environment.")
    +      opt[String]("engineInstanceId") required() action { (x, c) =>
    +        c.copy(engineInstanceId = x)
    +      } text("Engine instance ID.")
    +      opt[Unit]("verbose") action { (x, c) =>
    +        c.copy(verbose = true)
    +      } text("Enable verbose output.")
    +      opt[Unit]("debug") action { (x, c) =>
    +        c.copy(debug = true)
    +      } text("Enable debug output.")
    +      opt[String]("json-extractor") action { (x, c) =>
    +        c.copy(jsonExtractor = JsonExtractorOption.withName(x))
    +      }
    +    }
    +
    +    parser.parse(args, BatchPredictConfig()) map { config =>
    +      WorkflowUtils.modifyLogging(config.verbose)
    +      engineInstances.get(config.engineInstanceId) map { engineInstance =>
    +
    +        val engine = getEngine(engineInstance)
    +
    +        run(config, engineInstance, engine)
    +
    +      } getOrElse {
    +        error(s"Invalid engine instance ID. Aborting batch predict.")
    +      }
    +    }
    +  }
    +
    +  def getEngine(engineInstance: EngineInstance): Engine[_, _, _, _, _, _] = {
    +
    +    val engineFactoryName = engineInstance.engineFactory
    +
    +    val (engineLanguage, engineFactory) =
    +      WorkflowUtils.getEngine(engineFactoryName, getClass.getClassLoader)
    +    val maybeEngine = engineFactory()
    +
    +    // EngineFactory return a base engine, which may not be deployable.
    +    if (!maybeEngine.isInstanceOf[Engine[_,_,_,_,_,_]]) {
    +      throw new NoSuchMethodException(
    +        s"Engine $maybeEngine cannot be used for batch predict")
    +    }
    +
    +    maybeEngine.asInstanceOf[Engine[_,_,_,_,_,_]]
    +  }
    +
    +  def run[Q, P](
    +    config: BatchPredictConfig,
    +    engineInstance: EngineInstance,
    +    engine: Engine[_, _, _, Q, P, _]): Unit = {
    +
    +    val engineParams = engine.engineInstanceToEngineParams(
    +      engineInstance, config.jsonExtractor)
    +
    +    val kryo = KryoInstantiator.newKryoInjection
    +
    +    val modelsFromEngineInstance =
    +      kryo.invert(modeldata.get(engineInstance.id).get.models).get.
    +      asInstanceOf[Seq[Any]]
    +
    +    val prepareSparkContext = WorkflowContext(
    +      batch = engineInstance.engineFactory,
    +      executorEnv = engineInstance.env,
    +      mode = "Batch Predict (model)",
    +      sparkEnv = engineInstance.sparkConf)
    +
    +    val models = engine.prepareDeploy(
    +      prepareSparkContext,
    +      engineParams,
    +      engineInstance.id,
    +      modelsFromEngineInstance,
    +      params = WorkflowParams()
    +    )
    +
    +    val algorithms = engineParams.algorithmParamsList.map { case (n, p) =>
    +      Doer(engine.algorithmClassMap(n), p)
    +    }
    +
    +    val servingParamsWithName = engineParams.servingParams
    +
    +    val serving = Doer(engine.servingClassMap(servingParamsWithName._1),
    +      servingParamsWithName._2)
    +
    +    val runSparkContext = WorkflowContext(
    +      batch = engineInstance.engineFactory,
    +      executorEnv = engineInstance.env,
    +      mode = "Batch Predict (runner)",
    +      sparkEnv = engineInstance.sparkConf)
    +
    +    val inputRDD: RDD[String] = runSparkContext.textFile(config.inputFilePath)
    +    val queriesRDD: RDD[String] = config.queryPartitions match {
    +      case Some(p) => inputRDD.repartition(p)
    +      case None => inputRDD
    +    }
    +
    +    val predictionsRDD: RDD[String] = queriesRDD.map { queryString =>
    +      val jsonExtractorOption = config.jsonExtractor
    +      // Extract Query from Json
    +      val query = JsonExtractor.extract(
    +        jsonExtractorOption,
    +        queryString,
    +        algorithms.head.queryClass,
    +        algorithms.head.querySerializer,
    +        algorithms.head.gsonTypeAdapterFactories
    +      )
    +      // Deploy logic. First call Serving.supplement, then Algo.predict,
    +      // finally Serving.serve.
    +      val supplementedQuery = serving.supplementBase(query)
    +      // TODO: Parallelize the following.
    +      val predictions = algorithms.zipWithIndex.map { case (a, ai) =>
    +        a.predictBase(models(ai), supplementedQuery)
    +      }
    --- End diff --
    
    Zipping predictions and models directly is simpler.
    
    ```scala
    val predictions = algorithms.zip(models).map { case (a, m) =>
      a.predictBase(m, supplementedQuery)
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-predictionio pull request #412: [PIO-105] Batch Predictions

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-predictionio/pull/412


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-predictionio pull request #412: [PIO-105] Batch Predictions

Posted by takezoe <gi...@git.apache.org>.
Github user takezoe commented on a diff in the pull request:

    https://github.com/apache/incubator-predictionio/pull/412#discussion_r127857163
  
    --- Diff: tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala ---
    @@ -262,6 +263,56 @@ object Engine extends EitherLogging {
         }
       }
     
    +  /** Batch predict with an engine.
    +    *
    +    * @param ea An instance of [[EngineArgs]]
    +    * @param engineInstanceId An instance of [[engineInstanceId]]
    +    * @param batchPredictArgs An instance of [[BatchPredictArgs]]
    +    * @param sparkArgs An instance of [[SparkArgs]]
    +    * @param pioHome [[String]] with a path to PIO installation
    +    * @param verbose A [[Boolean]]
    +    * @return An instance of [[Expected]] contaning either [[Left]]
    +    *         with an error message or [[Right]] with a handle to process
    +    *         of a running angine  and a function () => Unit,
    +    *         that must be called when the process is complete
    +    */
    +  def batchPredict(
    +    ea: EngineArgs,
    +    engineInstanceId: Option[String],
    +    batchPredictArgs: BatchPredictArgs,
    +    sparkArgs: SparkArgs,
    +    pioHome: String,
    +    verbose: Boolean = false): Expected[(Process, () => Unit)] = {
    +
    +    val engineDirPath = getEngineDirPath(ea.engineDir)
    +    val verifyResult = Template.verifyTemplateMinVersion(
    +      new File(engineDirPath, "template.json"))
    +    if (verifyResult.isLeft) {
    +      return Left(verifyResult.left.get)
    --- End diff --
    
    Generally, avoiding return is preferred in Scala if it's possible. I think following is better:
    
    ```scala
    verifyResult match {
      case x @ Left(err) => x
      case Right(_) => {
      	...
      }
    }
    ```
    
    or this is bit shorter:
    
    ```scala
    verifyResult.right.flatMap { _ =>
      ...
    }
    ```
    
    However, there are some other `return`s in this file, so it might be better to fully refactor in an another pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-predictionio pull request #412: [PIO-105] Batch Predictions

Posted by takezoe <gi...@git.apache.org>.
Github user takezoe commented on a diff in the pull request:

    https://github.com/apache/incubator-predictionio/pull/412#discussion_r127852943
  
    --- Diff: core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.predictionio.workflow
    +
    +import java.io.Serializable
    +
    +import com.twitter.bijection.Injection
    +import com.twitter.chill.{KryoBase, KryoInjection, ScalaKryoInstantiator}
    +import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer
    +import grizzled.slf4j.Logging
    +import org.apache.predictionio.controller.{Engine, Utils}
    +import org.apache.predictionio.core.{BaseAlgorithm, BaseServing, Doer}
    +import org.apache.predictionio.data.storage.{EngineInstance, Storage}
    +import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
    +import org.apache.spark.rdd.RDD
    +import org.json4s._
    +import org.json4s.native.JsonMethods._
    +import scala.language.existentials
    +
    +case class BatchPredictConfig(
    +  inputFilePath: String = "batchpredict-input.json",
    +  outputFilePath: String = "batchpredict-output.json",
    +  queryPartitions: Option[Int] = None,
    +  engineInstanceId: String = "",
    +  engineId: Option[String] = None,
    +  engineVersion: Option[String] = None,
    +  engineVariant: String = "",
    +  env: Option[String] = None,
    +  verbose: Boolean = false,
    +  debug: Boolean = false,
    +  jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both)
    +
    +object BatchPredict extends Logging {
    +
    +  class KryoInstantiator(classLoader: ClassLoader) extends ScalaKryoInstantiator {
    +    override def newKryo(): KryoBase = {
    +      val kryo = super.newKryo()
    +      kryo.setClassLoader(classLoader)
    +      SynchronizedCollectionsSerializer.registerSerializers(kryo)
    +      kryo
    +    }
    +  }
    +
    +  object KryoInstantiator extends Serializable {
    +    def newKryoInjection : Injection[Any, Array[Byte]] = {
    +      val kryoInstantiator = new KryoInstantiator(getClass.getClassLoader)
    +      KryoInjection.instance(kryoInstantiator)
    +    }
    +  }
    +
    +  val engineInstances = Storage.getMetaDataEngineInstances
    +  val modeldata = Storage.getModelDataModels
    +
    +  def main(args: Array[String]): Unit = {
    +    val parser = new scopt.OptionParser[BatchPredictConfig]("BatchPredict") {
    +      opt[String]("input") action { (x, c) =>
    +        c.copy(inputFilePath = x)
    +      } text("Path to file containing input queries; a " +
    +        "multi-object JSON file with one object per line.")
    +      opt[String]("output") action { (x, c) =>
    +        c.copy(outputFilePath = x)
    +      } text("Path to file containing output predictions; a " +
    +        "multi-object JSON file with one object per line.")
    +      opt[Int]("query-partitions") action { (x, c) =>
    +        c.copy(queryPartitions = Some(x))
    +      } text("Limit concurrency of predictions by setting the number " +
    +        "of partitions used internally for the RDD of queries.")
    +      opt[String]("engineId") action { (x, c) =>
    +        c.copy(engineId = Some(x))
    +      } text("Engine ID.")
    +      opt[String]("engineId") action { (x, c) =>
    +        c.copy(engineId = Some(x))
    +      } text("Engine ID.")
    +      opt[String]("engineVersion") action { (x, c) =>
    +        c.copy(engineVersion = Some(x))
    +      } text("Engine version.")
    +      opt[String]("engine-variant") required() action { (x, c) =>
    +        c.copy(engineVariant = x)
    +      } text("Engine variant JSON.")
    +      opt[String]("env") action { (x, c) =>
    +        c.copy(env = Some(x))
    +      } text("Comma-separated list of environmental variables (in 'FOO=BAR' " +
    +        "format) to pass to the Spark execution environment.")
    +      opt[String]("engineInstanceId") required() action { (x, c) =>
    +        c.copy(engineInstanceId = x)
    +      } text("Engine instance ID.")
    +      opt[Unit]("verbose") action { (x, c) =>
    +        c.copy(verbose = true)
    +      } text("Enable verbose output.")
    +      opt[Unit]("debug") action { (x, c) =>
    +        c.copy(debug = true)
    +      } text("Enable debug output.")
    +      opt[String]("json-extractor") action { (x, c) =>
    +        c.copy(jsonExtractor = JsonExtractorOption.withName(x))
    +      }
    +    }
    +
    +    parser.parse(args, BatchPredictConfig()) map { config =>
    +      WorkflowUtils.modifyLogging(config.verbose)
    +      engineInstances.get(config.engineInstanceId) map { engineInstance =>
    +
    +        val engine = getEngine(engineInstance)
    +
    +        run(config, engineInstance, engine)
    +
    +      } getOrElse {
    +        error(s"Invalid engine instance ID. Aborting batch predict.")
    +      }
    +    }
    +  }
    +
    +  def getEngine(engineInstance: EngineInstance): Engine[_, _, _, _, _, _] = {
    +
    +    val engineFactoryName = engineInstance.engineFactory
    +
    +    val (engineLanguage, engineFactory) =
    +      WorkflowUtils.getEngine(engineFactoryName, getClass.getClassLoader)
    +    val maybeEngine = engineFactory()
    +
    +    // EngineFactory return a base engine, which may not be deployable.
    --- End diff --
    
    You can check type and cast at once by using pattern match:
    
    ```scala
    maybeEngine match {
      case e: Engine[_, _, _, _, _, _] => e
      case _ => throw new NoSuchMethodException(
        s"Engine $maybeEngine cannot be used for batch predict")
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-predictionio issue #412: [PIO-105] Batch Predictions

Posted by dszeto <gi...@git.apache.org>.
Github user dszeto commented on the issue:

    https://github.com/apache/incubator-predictionio/pull/412
  
    LGTM. Great work @mars !
    
    Agree with @takezoe 's comments. I think there are a few follow up tasks we should do.
    
    1. Refactor common code that exists in both `CreateServer` and `BatchPredict`.
    2. Brush up Scala coding style as pointed out by @takezoe .
    3. Document the new feature in the doc site.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-predictionio issue #412: [PIO-105] Batch Predictions

Posted by mars <gi...@git.apache.org>.
Github user mars commented on the issue:

    https://github.com/apache/incubator-predictionio/pull/412
  
    @takezoe thank you for the feedback. As a relatively-new Scala programmer I really appreciate this kind of review.
    
    I am a bit hesitant to make these changes. I'm trying to maintain likeness with the [`CreateServer.scala`](https://github.com/mars/incubator-predictionio/blob/e7c6ebd8cfe2d4a150319025876520fc39be9a34/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala) code, to minimize differences in prediction behavior between `pio deploy` and `pio batchpredict`. Any of these stylistic points should probably be matched in CreateServer, so that it continues to be easy to reason about their similarity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---