You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by ch...@apache.org on 2017/09/27 22:10:56 UTC

[21/50] [abbrv] incubator-predictionio git commit: [PIO-105] Batch Predictions

[PIO-105] Batch Predictions

Implement a new pio batchpredict command to enable massive, fast,
batch predictions from a trained model. Read a multi-object JSON file as
the input format, with one query object per line. Similarly, write
results to a multi-object JSON file, with one prediction result + its
original query per line.

Closes #412


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/cfa3f5da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/cfa3f5da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/cfa3f5da

Branch: refs/heads/master
Commit: cfa3f5dab533a67688ec2f84182eccedc56fa84e
Parents: 965c73f
Author: Mars Hall <ma...@heroku.com>
Authored: Tue Aug 1 14:56:47 2017 -0700
Committer: Donald Szeto <do...@apache.org>
Committed: Tue Aug 1 14:56:47 2017 -0700

----------------------------------------------------------------------
 .../predictionio/workflow/BatchPredict.scala    | 230 +++++++++++++++++++
 .../predictionio/tools/RunBatchPredict.scala    |  72 ++++++
 .../predictionio/tools/commands/Engine.scala    |  55 ++++-
 .../predictionio/tools/console/Console.scala    |  58 ++++-
 .../apache/predictionio/tools/console/Pio.scala |  13 +-
 .../tools/console/batchpredict.scala.txt        |  25 ++
 .../predictionio/tools/console/main.scala.txt   |   1 +
 7 files changed, 450 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
new file mode 100644
index 0000000..2fb0545
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.workflow
+
+import java.io.Serializable
+
+import com.twitter.bijection.Injection
+import com.twitter.chill.{KryoBase, KryoInjection, ScalaKryoInstantiator}
+import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer
+import grizzled.slf4j.Logging
+import org.apache.predictionio.controller.{Engine, Utils}
+import org.apache.predictionio.core.{BaseAlgorithm, BaseServing, Doer}
+import org.apache.predictionio.data.storage.{EngineInstance, Storage}
+import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
+import org.apache.spark.rdd.RDD
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import scala.language.existentials
+
+case class BatchPredictConfig(
+  inputFilePath: String = "batchpredict-input.json",
+  outputFilePath: String = "batchpredict-output.json",
+  queryPartitions: Option[Int] = None,
+  engineInstanceId: String = "",
+  engineId: Option[String] = None,
+  engineVersion: Option[String] = None,
+  engineVariant: String = "",
+  env: Option[String] = None,
+  verbose: Boolean = false,
+  debug: Boolean = false,
+  jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both)
+
+object BatchPredict extends Logging {
+
+  class KryoInstantiator(classLoader: ClassLoader) extends ScalaKryoInstantiator {
+    override def newKryo(): KryoBase = {
+      val kryo = super.newKryo()
+      kryo.setClassLoader(classLoader)
+      SynchronizedCollectionsSerializer.registerSerializers(kryo)
+      kryo
+    }
+  }
+
+  object KryoInstantiator extends Serializable {
+    def newKryoInjection : Injection[Any, Array[Byte]] = {
+      val kryoInstantiator = new KryoInstantiator(getClass.getClassLoader)
+      KryoInjection.instance(kryoInstantiator)
+    }
+  }
+
+  val engineInstances = Storage.getMetaDataEngineInstances
+  val modeldata = Storage.getModelDataModels
+
+  def main(args: Array[String]): Unit = {
+    val parser = new scopt.OptionParser[BatchPredictConfig]("BatchPredict") {
+      opt[String]("input") action { (x, c) =>
+        c.copy(inputFilePath = x)
+      } text("Path to file containing input queries; a " +
+        "multi-object JSON file with one object per line.")
+      opt[String]("output") action { (x, c) =>
+        c.copy(outputFilePath = x)
+      } text("Path to file containing output predictions; a " +
+        "multi-object JSON file with one object per line.")
+      opt[Int]("query-partitions") action { (x, c) =>
+        c.copy(queryPartitions = Some(x))
+      } text("Limit concurrency of predictions by setting the number " +
+        "of partitions used internally for the RDD of queries.")
+      opt[String]("engineId") action { (x, c) =>
+        c.copy(engineId = Some(x))
+      } text("Engine ID.")
+      opt[String]("engineId") action { (x, c) =>
+        c.copy(engineId = Some(x))
+      } text("Engine ID.")
+      opt[String]("engineVersion") action { (x, c) =>
+        c.copy(engineVersion = Some(x))
+      } text("Engine version.")
+      opt[String]("engine-variant") required() action { (x, c) =>
+        c.copy(engineVariant = x)
+      } text("Engine variant JSON.")
+      opt[String]("env") action { (x, c) =>
+        c.copy(env = Some(x))
+      } text("Comma-separated list of environmental variables (in 'FOO=BAR' " +
+        "format) to pass to the Spark execution environment.")
+      opt[String]("engineInstanceId") required() action { (x, c) =>
+        c.copy(engineInstanceId = x)
+      } text("Engine instance ID.")
+      opt[Unit]("verbose") action { (x, c) =>
+        c.copy(verbose = true)
+      } text("Enable verbose output.")
+      opt[Unit]("debug") action { (x, c) =>
+        c.copy(debug = true)
+      } text("Enable debug output.")
+      opt[String]("json-extractor") action { (x, c) =>
+        c.copy(jsonExtractor = JsonExtractorOption.withName(x))
+      }
+    }
+
+    parser.parse(args, BatchPredictConfig()) map { config =>
+      WorkflowUtils.modifyLogging(config.verbose)
+      engineInstances.get(config.engineInstanceId) map { engineInstance =>
+
+        val engine = getEngine(engineInstance)
+
+        run(config, engineInstance, engine)
+
+      } getOrElse {
+        error(s"Invalid engine instance ID. Aborting batch predict.")
+      }
+    }
+  }
+
+  def getEngine(engineInstance: EngineInstance): Engine[_, _, _, _, _, _] = {
+
+    val engineFactoryName = engineInstance.engineFactory
+
+    val (engineLanguage, engineFactory) =
+      WorkflowUtils.getEngine(engineFactoryName, getClass.getClassLoader)
+    val maybeEngine = engineFactory()
+
+    // EngineFactory return a base engine, which may not be deployable.
+    if (!maybeEngine.isInstanceOf[Engine[_,_,_,_,_,_]]) {
+      throw new NoSuchMethodException(
+        s"Engine $maybeEngine cannot be used for batch predict")
+    }
+
+    maybeEngine.asInstanceOf[Engine[_,_,_,_,_,_]]
+  }
+
+  def run[Q, P](
+    config: BatchPredictConfig,
+    engineInstance: EngineInstance,
+    engine: Engine[_, _, _, Q, P, _]): Unit = {
+
+    val engineParams = engine.engineInstanceToEngineParams(
+      engineInstance, config.jsonExtractor)
+
+    val kryo = KryoInstantiator.newKryoInjection
+
+    val modelsFromEngineInstance =
+      kryo.invert(modeldata.get(engineInstance.id).get.models).get.
+      asInstanceOf[Seq[Any]]
+
+    val prepareSparkContext = WorkflowContext(
+      batch = engineInstance.engineFactory,
+      executorEnv = engineInstance.env,
+      mode = "Batch Predict (model)",
+      sparkEnv = engineInstance.sparkConf)
+
+    val models = engine.prepareDeploy(
+      prepareSparkContext,
+      engineParams,
+      engineInstance.id,
+      modelsFromEngineInstance,
+      params = WorkflowParams()
+    )
+
+    val algorithms = engineParams.algorithmParamsList.map { case (n, p) =>
+      Doer(engine.algorithmClassMap(n), p)
+    }
+
+    val servingParamsWithName = engineParams.servingParams
+
+    val serving = Doer(engine.servingClassMap(servingParamsWithName._1),
+      servingParamsWithName._2)
+
+    val runSparkContext = WorkflowContext(
+      batch = engineInstance.engineFactory,
+      executorEnv = engineInstance.env,
+      mode = "Batch Predict (runner)",
+      sparkEnv = engineInstance.sparkConf)
+
+    val inputRDD: RDD[String] = runSparkContext.
+      textFile(config.inputFilePath).
+      filter(_.trim.nonEmpty)
+    val queriesRDD: RDD[String] = config.queryPartitions match {
+      case Some(p) => inputRDD.repartition(p)
+      case None => inputRDD
+    }
+
+    val predictionsRDD: RDD[String] = queriesRDD.map { queryString =>
+      val jsonExtractorOption = config.jsonExtractor
+      // Extract Query from Json
+      val query = JsonExtractor.extract(
+        jsonExtractorOption,
+        queryString,
+        algorithms.head.queryClass,
+        algorithms.head.querySerializer,
+        algorithms.head.gsonTypeAdapterFactories
+      )
+      // Deploy logic. First call Serving.supplement, then Algo.predict,
+      // finally Serving.serve.
+      val supplementedQuery = serving.supplementBase(query)
+      // TODO: Parallelize the following.
+      val predictions = algorithms.zipWithIndex.map { case (a, ai) =>
+        a.predictBase(models(ai), supplementedQuery)
+      }
+      // Notice that it is by design to call Serving.serve with the
+      // *original* query.
+      val prediction = serving.serveBase(query, predictions)
+      // Combine query with prediction, so the batch results are
+      // self-descriptive.
+      val predictionJValue = JsonExtractor.toJValue(
+        jsonExtractorOption,
+        Map("query" -> query,
+            "prediction" -> prediction),
+        algorithms.head.querySerializer,
+        algorithms.head.gsonTypeAdapterFactories)
+      // Return JSON string
+      compact(render(predictionJValue))
+    }
+
+    predictionsRDD.saveAsTextFile(config.outputFilePath)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala
new file mode 100644
index 0000000..35572c9
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.tools
+
+import org.apache.predictionio.tools.Common._
+import org.apache.predictionio.tools.ReturnTypes._
+import org.apache.predictionio.workflow.JsonExtractorOption
+import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
+
+import java.io.File
+import grizzled.slf4j.Logging
+
+import scala.sys.process._
+
+case class BatchPredictArgs(
+  inputFilePath: String = "batchpredict-input.json",
+  outputFilePath: String = "batchpredict-output.json",
+  queryPartitions: Option[Int] = None,
+  variantJson: Option[File] = None,
+  jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both)
+
+
+object RunBatchPredict extends Logging {
+
+  def runBatchPredict(
+    engineInstanceId: String,
+    batchPredictArgs: BatchPredictArgs,
+    sparkArgs: SparkArgs,
+    pioHome: String,
+    engineDirPath: String,
+    verbose: Boolean = false): Expected[(Process, () => Unit)] = {
+
+    val jarFiles = jarFilesForScala(engineDirPath).map(_.toURI) ++
+      Option(new File(pioHome, "plugins").listFiles())
+        .getOrElse(Array.empty[File]).map(_.toURI)
+    val args = Seq[String](
+      "--input",
+      batchPredictArgs.inputFilePath,
+      "--output",
+      batchPredictArgs.outputFilePath,
+      "--engineInstanceId",
+      engineInstanceId,
+      "--engine-variant",
+      batchPredictArgs.variantJson.getOrElse(
+        new File(engineDirPath, "engine.json")).getCanonicalPath) ++
+      (if (batchPredictArgs.queryPartitions.isEmpty) Seq()
+        else Seq("--query-partitions",
+                  batchPredictArgs.queryPartitions.get.toString)) ++
+      (if (verbose) Seq("--verbose") else Seq()) ++
+      Seq("--json-extractor", batchPredictArgs.jsonExtractor.toString)
+
+    Runner.runOnSpark(
+      "org.apache.predictionio.workflow.BatchPredict",
+      args, sparkArgs, jarFiles, pioHome, verbose)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
index e49c3fc..e3460a5 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
@@ -21,8 +21,9 @@ import org.apache.predictionio.core.BuildInfo
 import org.apache.predictionio.controller.Utils
 import org.apache.predictionio.data.storage
 import org.apache.predictionio.tools.EitherLogging
-import org.apache.predictionio.tools.{RunWorkflow, RunServer}
-import org.apache.predictionio.tools.{DeployArgs, WorkflowArgs, SparkArgs, ServerArgs}
+import org.apache.predictionio.tools.{RunWorkflow, RunServer, RunBatchPredict}
+import org.apache.predictionio.tools.{
+  DeployArgs, WorkflowArgs, SparkArgs, ServerArgs, BatchPredictArgs}
 import org.apache.predictionio.tools.console.Console
 import org.apache.predictionio.tools.Common._
 import org.apache.predictionio.tools.ReturnTypes._
@@ -262,6 +263,56 @@ object Engine extends EitherLogging {
     }
   }
 
+  /** Batch predict with an engine.
+    *
+    * @param ea An instance of [[EngineArgs]]
+    * @param engineInstanceId An instance of [[engineInstanceId]]
+    * @param batchPredictArgs An instance of [[BatchPredictArgs]]
+    * @param sparkArgs An instance of [[SparkArgs]]
+    * @param pioHome [[String]] with a path to PIO installation
+    * @param verbose A [[Boolean]]
+    * @return An instance of [[Expected]] contaning either [[Left]]
+    *         with an error message or [[Right]] with a handle to process
+    *         of a running angine  and a function () => Unit,
+    *         that must be called when the process is complete
+    */
+  def batchPredict(
+    ea: EngineArgs,
+    engineInstanceId: Option[String],
+    batchPredictArgs: BatchPredictArgs,
+    sparkArgs: SparkArgs,
+    pioHome: String,
+    verbose: Boolean = false): Expected[(Process, () => Unit)] = {
+
+    val engineDirPath = getEngineDirPath(ea.engineDir)
+    val verifyResult = Template.verifyTemplateMinVersion(
+      new File(engineDirPath, "template.json"))
+    if (verifyResult.isLeft) {
+      return Left(verifyResult.left.get)
+    }
+    val ei = Console.getEngineInfo(
+      batchPredictArgs.variantJson.getOrElse(new File(engineDirPath, "engine.json")),
+      engineDirPath)
+    val engineInstances = storage.Storage.getMetaDataEngineInstances
+    val engineInstance = engineInstanceId map { eid =>
+      engineInstances.get(eid)
+    } getOrElse {
+      engineInstances.getLatestCompleted(
+        ei.engineId, ei.engineVersion, ei.variantId)
+    }
+    engineInstance map { r =>
+      RunBatchPredict.runBatchPredict(
+        r.id, batchPredictArgs, sparkArgs, pioHome, engineDirPath, verbose)
+    } getOrElse {
+      engineInstanceId map { eid =>
+        logAndFail(s"Invalid engine instance ID ${eid}. Aborting.")
+      } getOrElse {
+        logAndFail(s"No valid engine instance found for engine ${ei.engineId} " +
+          s"${ei.engineVersion}.\nTry running 'train' before 'batchpredict'. Aborting.")
+      }
+    }
+  }
+
   /** Running a driver on spark.
     *  The function starts a process and returns immediately
     *

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
index 535905a..4a72635 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
@@ -27,7 +27,8 @@ import org.apache.predictionio.tools.commands.{
   DashboardArgs, AdminServerArgs, ImportArgs, ExportArgs,
   BuildArgs, EngineArgs}
 import org.apache.predictionio.tools.{
-  EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs, DeployArgs}
+  EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs,
+  DeployArgs, BatchPredictArgs}
 import org.apache.predictionio.workflow.{JsonExtractorOption, WorkflowUtils}
 import org.json4s._
 import org.json4s.native.JsonMethods._
@@ -42,6 +43,7 @@ case class ConsoleArgs(
   workflow: WorkflowArgs = WorkflowArgs(),
   accessKey: AccessKeyArgs = AccessKeyArgs(),
   deploy: DeployArgs = DeployArgs(),
+  batchPredict: BatchPredictArgs = BatchPredictArgs(),
   eventServer: EventServerArgs = EventServerArgs(),
   adminServer: AdminServerArgs = AdminServerArgs(),
   dashboard: DashboardArgs = DashboardArgs(),
@@ -323,6 +325,46 @@ object Console extends Logging {
           } text("Port to unbind from. Default: 8000")
         )
       note("")
+      cmd("batchpredict").
+        text("Use an engine instance to process batch predictions. This\n" +
+              "command will pass all pass-through arguments to its underlying\n" +
+              "spark-submit command. All algorithm classes used in the engine\n" +
+              "must be serializable.").
+        action { (_, c) =>
+          c.copy(commands = c.commands :+ "batchpredict")
+        } children(
+          opt[String]("input") action { (x, c) =>
+            c.copy(batchPredict = c.batchPredict.copy(inputFilePath = x))
+          } text("Path to file containing queries; a multi-object JSON file\n" +
+                  "with one query object per line. Accepts any valid Hadoop\n" +
+                  "file URL. Default: batchpredict-input.json"),
+          opt[String]("output") action { (x, c) =>
+            c.copy(batchPredict = c.batchPredict.copy(outputFilePath = x))
+          } text("Path to file to receive results; a multi-object JSON file\n" +
+                  "with one object per line, the prediction + original query.\n" +
+                  "Accepts any valid Hadoop file URL. Actual output will be\n" +
+                  "written as Hadoop partition files in a directory with the\n" +
+                  "output name. Default: batchpredict-output.json"),
+          opt[Int]("query-partitions") action { (x, c) =>
+            c.copy(batchPredict = c.batchPredict.copy(queryPartitions = Some(x)))
+          } text("Limit concurrency of predictions by setting the number\n" +
+                  "of partitions used internally for the RDD of queries.\n" +
+                  "Default: number created by Spark context's `textFile`"),
+          opt[String]("engine-instance-id") action { (x, c) =>
+            c.copy(engineInstanceId = Some(x))
+          } text("Engine instance ID."),
+          opt[String]("json-extractor") action { (x, c) =>
+            c.copy(workflow = c.workflow.copy(jsonExtractor = JsonExtractorOption.withName(x)))
+          } validate { x =>
+            if (JsonExtractorOption.values.map(_.toString).contains(x)) {
+              success
+            } else {
+              val validOptions = JsonExtractorOption.values.mkString("|")
+              failure(s"$x is not a valid json-extractor option [$validOptions]")
+            }
+          }
+        )
+      note("")
       cmd("dashboard").
         text("Launch a dashboard at the specific IP and port.").
         action { (_, c) =>
@@ -644,6 +686,19 @@ object Console extends Logging {
             ca.verbose)
         case Seq("undeploy") =>
           Pio.undeploy(ca.deploy)
+        case Seq("batchpredict") =>
+          Pio.batchPredict(
+            ca.engine,
+            ca.engineInstanceId,
+            BatchPredictArgs(
+              ca.batchPredict.inputFilePath,
+              ca.batchPredict.outputFilePath,
+              ca.batchPredict.queryPartitions,
+              ca.workflow.variantJson,
+              ca.workflow.jsonExtractor),
+            ca.spark,
+            ca.pioHome.get,
+            ca.verbose)
         case Seq("dashboard") =>
           Pio.dashboard(ca.dashboard)
         case Seq("eventserver") =>
@@ -756,6 +811,7 @@ object Console extends Logging {
     "build" -> txt.build().toString,
     "train" -> txt.train().toString,
     "deploy" -> txt.deploy().toString,
+    "batchpredict" -> txt.batchpredict().toString,
     "eventserver" -> txt.eventserver().toString,
     "adminserver" -> txt.adminserver().toString,
     "app" -> txt.app().toString,

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
index dd78717..ef4581b 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala
@@ -18,7 +18,8 @@
 package org.apache.predictionio.tools.console
 
 import org.apache.predictionio.tools.{
-  EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs, DeployArgs}
+  EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs,
+  DeployArgs, BatchPredictArgs}
 import org.apache.predictionio.tools.commands.{
   DashboardArgs, AdminServerArgs, ImportArgs, ExportArgs,
   BuildArgs, EngineArgs, Management, Engine, Import, Export,
@@ -104,6 +105,16 @@ object Pio extends Logging {
 
   def undeploy(da: DeployArgs): Int = Engine.undeploy(da)
 
+  def batchPredict(
+    ea: EngineArgs,
+    engineInstanceId: Option[String],
+    batchPredictArgs: BatchPredictArgs,
+    sparkArgs: SparkArgs,
+    pioHome: String,
+    verbose: Boolean = false): Int =
+      processAwaitAndClean(Engine.batchPredict(
+        ea, engineInstanceId, batchPredictArgs, sparkArgs, pioHome, verbose))
+
   def dashboard(da: DashboardArgs): Int = {
     Management.dashboard(da).awaitTermination
     0

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/twirl/org/apache/predictionio/tools/console/batchpredict.scala.txt
----------------------------------------------------------------------
diff --git a/tools/src/main/twirl/org/apache/predictionio/tools/console/batchpredict.scala.txt b/tools/src/main/twirl/org/apache/predictionio/tools/console/batchpredict.scala.txt
new file mode 100644
index 0000000..d9d5d74
--- /dev/null
+++ b/tools/src/main/twirl/org/apache/predictionio/tools/console/batchpredict.scala.txt
@@ -0,0 +1,25 @@
+Usage: pio batchpredict [--input <value>]
+                        [--output <value>]
+                        [--query-partitions <value>]
+                        [--engine-instance-id <value>]
+
+Use an engine instance to process batch predictions. This command will pass all
+pass-through arguments to its underlying spark-submit command. All algorithm
+classes used in the engine must be serializable.
+
+  --input <value>
+      Path to file containing queries; a multi-object JSON file with one
+      query object per line. Accepts any valid Hadoop file URL.
+      Default: batchpredict-input.json
+  --output <value>
+      Path to file to receive results; a multi-object JSON file with one
+      object per line, the prediction + original query. Accepts any
+      valid Hadoop file URL. Actual output will be written as Hadoop
+      partition files in a directory with the output name.
+      Default: batchpredict-output.json
+  --query-partitions <value>
+      Limit concurrency of predictions by setting the number of partitions
+      used internally for the RDD of queries.
+      Default: number created by Spark context's `textFile`
+  --engine-instance-id <value>
+      Engine instance ID. Default: the latest trained instance.

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt
----------------------------------------------------------------------
diff --git a/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt b/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt
index 5efa4bf..01be96d 100644
--- a/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt
+++ b/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt
@@ -38,6 +38,7 @@ The most commonly used pio commands are:
     build         Build an engine at the current directory
     train         Kick off a training using an engine
     deploy        Deploy an engine as an engine server
+    batchpredict  Process bulk predictions with an engine
     eventserver   Launch an Event Server
     app           Manage apps that are used by the Event Server
     accesskey     Manage app access keys