You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:33 UTC
[02/34] incubator-predictionio git commit: rename all except examples
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
new file mode 100644
index 0000000..87aac07
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
@@ -0,0 +1,1277 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.console
+
+import java.io.File
+import java.net.URI
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.controller.Utils
+import org.apache.predictionio.core.BuildInfo
+import org.apache.predictionio.data.api.EventServer
+import org.apache.predictionio.data.api.EventServerConfig
+import org.apache.predictionio.data.storage
+import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.data.storage.EngineManifestSerializer
+import org.apache.predictionio.data.storage.hbase.upgrade.Upgrade_0_8_3
+import org.apache.predictionio.tools.RegisterEngine
+import org.apache.predictionio.tools.RunServer
+import org.apache.predictionio.tools.RunWorkflow
+import org.apache.predictionio.tools.admin.AdminServer
+import org.apache.predictionio.tools.admin.AdminServerConfig
+import org.apache.predictionio.tools.dashboard.Dashboard
+import org.apache.predictionio.tools.dashboard.DashboardConfig
+import org.apache.predictionio.workflow.JsonExtractorOption
+import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
+import org.apache.predictionio.workflow.WorkflowUtils
+import org.apache.commons.io.FileUtils
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+import semverfi._
+
+import scala.collection.JavaConversions._
+import scala.io.Source
+import scala.sys.process._
+import scala.util.Random
+import scalaj.http.Http
+
+case class ConsoleArgs(
+ common: CommonArgs = CommonArgs(),
+ build: BuildArgs = BuildArgs(),
+ app: AppArgs = AppArgs(),
+ accessKey: AccessKeyArgs = AccessKeyArgs(),
+ deploy: DeployArgs = DeployArgs(),
+ eventServer: EventServerArgs = EventServerArgs(),
+ adminServer: AdminServerArgs = AdminServerArgs(),
+ dashboard: DashboardArgs = DashboardArgs(),
+ upgrade: UpgradeArgs = UpgradeArgs(),
+ template: TemplateArgs = TemplateArgs(),
+ export: ExportArgs = ExportArgs(),
+ imprt: ImportArgs = ImportArgs(),
+ commands: Seq[String] = Seq(),
+ metricsClass: Option[String] = None,
+ metricsParamsJsonPath: Option[String] = None,
+ paramsPath: String = "params",
+ engineInstanceId: Option[String] = None,
+ mainClass: Option[String] = None)
+
+case class CommonArgs(
+ batch: String = "",
+ sparkPassThrough: Seq[String] = Seq(),
+ driverPassThrough: Seq[String] = Seq(),
+ pioHome: Option[String] = None,
+ sparkHome: Option[String] = None,
+ engineId: Option[String] = None,
+ engineVersion: Option[String] = None,
+ engineFactory: Option[String] = None,
+ engineParamsKey: Option[String] = None,
+ evaluation: Option[String] = None,
+ engineParamsGenerator: Option[String] = None,
+ variantJson: File = new File("engine.json"),
+ manifestJson: File = new File("manifest.json"),
+ stopAfterRead: Boolean = false,
+ stopAfterPrepare: Boolean = false,
+ skipSanityCheck: Boolean = false,
+ verbose: Boolean = false,
+ verbosity: Int = 0,
+ sparkKryo: Boolean = false,
+ scratchUri: Option[URI] = None,
+ jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both)
+
+case class BuildArgs(
+ sbt: Option[File] = None,
+ sbtExtra: Option[String] = None,
+ sbtAssemblyPackageDependency: Boolean = true,
+ sbtClean: Boolean = false,
+ uberJar: Boolean = false,
+ forceGeneratePIOSbt: Boolean = false)
+
+case class DeployArgs(
+ ip: String = "0.0.0.0",
+ port: Int = 8000,
+ logUrl: Option[String] = None,
+ logPrefix: Option[String] = None)
+
+case class EventServerArgs(
+ enabled: Boolean = false,
+ ip: String = "0.0.0.0",
+ port: Int = 7070,
+ stats: Boolean = false)
+
+case class AdminServerArgs(
+ip: String = "127.0.0.1",
+port: Int = 7071)
+
+case class DashboardArgs(
+ ip: String = "127.0.0.1",
+ port: Int = 9000)
+
+case class UpgradeArgs(
+ from: String = "0.0.0",
+ to: String = "0.0.0",
+ oldAppId: Int = 0,
+ newAppId: Int = 0
+)
+
+object Console extends Logging {
+ def main(args: Array[String]): Unit = {
+ val parser = new scopt.OptionParser[ConsoleArgs]("pio") {
+ override def showUsageOnError: Boolean = false
+ head("PredictionIO Command Line Interface Console", BuildInfo.version)
+ help("")
+ note("Note that it is possible to supply pass-through arguments at\n" +
+ "the end of the command by using a '--' separator, e.g.\n\n" +
+ "pio train --params-path params -- --master spark://mycluster:7077\n" +
+ "\nIn the example above, the '--master' argument will be passed to\n" +
+ "underlying spark-submit command. Please refer to the usage section\n" +
+ "for each command for more information.\n\n" +
+ "The following options are common to all commands:\n")
+ opt[String]("pio-home") action { (x, c) =>
+ c.copy(common = c.common.copy(pioHome = Some(x)))
+ } text("Root directory of a PredictionIO installation.\n" +
+ " Specify this if automatic discovery fail.")
+ opt[String]("spark-home") action { (x, c) =>
+ c.copy(common = c.common.copy(sparkHome = Some(x)))
+ } text("Root directory of an Apache Spark installation.\n" +
+ " If not specified, will try to use the SPARK_HOME\n" +
+ " environmental variable. If this fails as well, default to\n" +
+ " current directory.")
+ opt[String]("engine-id") abbr("ei") action { (x, c) =>
+ c.copy(common = c.common.copy(engineId = Some(x)))
+ } text("Specify an engine ID. Usually used by distributed deployment.")
+ opt[String]("engine-version") abbr("ev") action { (x, c) =>
+ c.copy(common = c.common.copy(engineVersion = Some(x)))
+ } text("Specify an engine version. Usually used by distributed " +
+ "deployment.")
+ opt[File]("variant") abbr("v") action { (x, c) =>
+ c.copy(common = c.common.copy(variantJson = x))
+ }
+ opt[File]("manifest") abbr("m") action { (x, c) =>
+ c.copy(common = c.common.copy(manifestJson = x))
+ }
+ opt[File]("sbt") action { (x, c) =>
+ c.copy(build = c.build.copy(sbt = Some(x)))
+ } validate { x =>
+ if (x.exists) {
+ success
+ } else {
+ failure(s"${x.getCanonicalPath} does not exist.")
+ }
+ } text("Path to sbt. Default: sbt")
+ opt[Unit]("verbose") action { (x, c) =>
+ c.copy(common = c.common.copy(verbose = true))
+ }
+ opt[Unit]("spark-kryo") abbr("sk") action { (x, c) =>
+ c.copy(common = c.common.copy(sparkKryo = true))
+ }
+ opt[String]("scratch-uri") action { (x, c) =>
+ c.copy(common = c.common.copy(scratchUri = Some(new URI(x))))
+ }
+ note("")
+ cmd("version").
+ text("Displays the version of this command line console.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "version")
+ }
+ note("")
+ cmd("help").action { (_, c) =>
+ c.copy(commands = c.commands :+ "help")
+ } children(
+ arg[String]("<command>") optional()
+ action { (x, c) =>
+ c.copy(commands = c.commands :+ x)
+ }
+ )
+ note("")
+ cmd("build").
+ text("Build an engine at the current directory.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "build")
+ } children(
+ opt[String]("sbt-extra") action { (x, c) =>
+ c.copy(build = c.build.copy(sbtExtra = Some(x)))
+ } text("Extra command to pass to SBT when it builds your engine."),
+ opt[Unit]("clean") action { (x, c) =>
+ c.copy(build = c.build.copy(sbtClean = true))
+ } text("Clean build."),
+ opt[Unit]("no-asm") action { (x, c) =>
+ c.copy(build = c.build.copy(sbtAssemblyPackageDependency = false))
+ } text("Skip building external dependencies assembly."),
+ opt[Unit]("uber-jar") action { (x, c) =>
+ c.copy(build = c.build.copy(uberJar = true))
+ },
+ opt[Unit]("generate-pio-sbt") action { (x, c) =>
+ c.copy(build = c.build.copy(forceGeneratePIOSbt = true))
+ }
+ )
+ note("")
+ cmd("unregister").
+ text("Unregister an engine at the current directory.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "unregister")
+ }
+ note("")
+ cmd("train").
+ text("Kick off a training using an engine. This will produce an\n" +
+ "engine instance. This command will pass all pass-through\n" +
+ "arguments to its underlying spark-submit command.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "train")
+ } children(
+ opt[String]("batch") action { (x, c) =>
+ c.copy(common = c.common.copy(batch = x))
+ } text("Batch label of the run."),
+ opt[String]("params-path") action { (x, c) =>
+ c.copy(paramsPath = x)
+ } text("Directory to lookup parameters JSON files. Default: params"),
+ opt[String]("metrics-params") abbr("mp") action { (x, c) =>
+ c.copy(metricsParamsJsonPath = Some(x))
+ } text("Metrics parameters JSON file. Will try to use\n" +
+ " metrics.json in the base path."),
+ opt[Unit]("skip-sanity-check") abbr("ssc") action { (x, c) =>
+ c.copy(common = c.common.copy(skipSanityCheck = true))
+ },
+ opt[Unit]("stop-after-read") abbr("sar") action { (x, c) =>
+ c.copy(common = c.common.copy(stopAfterRead = true))
+ },
+ opt[Unit]("stop-after-prepare") abbr("sap") action { (x, c) =>
+ c.copy(common = c.common.copy(stopAfterPrepare = true))
+ },
+ opt[Unit]("uber-jar") action { (x, c) =>
+ c.copy(build = c.build.copy(uberJar = true))
+ },
+ opt[Int]("verbosity") action { (x, c) =>
+ c.copy(common = c.common.copy(verbosity = x))
+ },
+ opt[String]("engine-factory") action { (x, c) =>
+ c.copy(common = c.common.copy(engineFactory = Some(x)))
+ },
+ opt[String]("engine-params-key") action { (x, c) =>
+ c.copy(common = c.common.copy(engineParamsKey = Some(x)))
+ },
+ opt[String]("json-extractor") action { (x, c) =>
+ c.copy(common = c.common.copy(jsonExtractor = JsonExtractorOption.withName(x)))
+ } validate { x =>
+ if (JsonExtractorOption.values.map(_.toString).contains(x)) {
+ success
+ } else {
+ val validOptions = JsonExtractorOption.values.mkString("|")
+ failure(s"$x is not a valid json-extractor option [$validOptions]")
+ }
+ }
+ )
+ note("")
+ cmd("eval").
+ text("Kick off an evaluation using an engine. This will produce an\n" +
+ "engine instance. This command will pass all pass-through\n" +
+ "arguments to its underlying spark-submit command.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "eval")
+ } children(
+ arg[String]("<evaluation-class>") action { (x, c) =>
+ c.copy(common = c.common.copy(evaluation = Some(x)))
+ },
+ arg[String]("[<engine-parameters-generator-class>]") optional() action { (x, c) =>
+ c.copy(common = c.common.copy(engineParamsGenerator = Some(x)))
+ } text("Optional engine parameters generator class, overriding the first argument"),
+ opt[String]("batch") action { (x, c) =>
+ c.copy(common = c.common.copy(batch = x))
+ } text("Batch label of the run."),
+ opt[String]("json-extractor") action { (x, c) =>
+ c.copy(common = c.common.copy(jsonExtractor = JsonExtractorOption.withName(x)))
+ } validate { x =>
+ if (JsonExtractorOption.values.map(_.toString).contains(x)) {
+ success
+ } else {
+ val validOptions = JsonExtractorOption.values.mkString("|")
+ failure(s"$x is not a valid json-extractor option [$validOptions]")
+ }
+ }
+ )
+ note("")
+ cmd("deploy").
+ text("Deploy an engine instance as a prediction server. This\n" +
+ "command will pass all pass-through arguments to its underlying\n" +
+ "spark-submit command.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "deploy")
+ } children(
+ opt[String]("batch") action { (x, c) =>
+ c.copy(common = c.common.copy(batch = x))
+ } text("Batch label of the deployment."),
+ opt[String]("engine-instance-id") action { (x, c) =>
+ c.copy(engineInstanceId = Some(x))
+ } text("Engine instance ID."),
+ opt[String]("ip") action { (x, c) =>
+ c.copy(deploy = c.deploy.copy(ip = x))
+ },
+ opt[Int]("port") action { (x, c) =>
+ c.copy(deploy = c.deploy.copy(port = x))
+ } text("Port to bind to. Default: 8000"),
+ opt[Unit]("feedback") action { (_, c) =>
+ c.copy(eventServer = c.eventServer.copy(enabled = true))
+ } text("Enable feedback loop to event server."),
+ opt[String]("event-server-ip") action { (x, c) =>
+ c.copy(eventServer = c.eventServer.copy(ip = x))
+ },
+ opt[Int]("event-server-port") action { (x, c) =>
+ c.copy(eventServer = c.eventServer.copy(port = x))
+ } text("Event server port. Default: 7070"),
+ opt[Int]("admin-server-port") action { (x, c) =>
+ c.copy(adminServer = c.adminServer.copy(port = x))
+ } text("Admin server port. Default: 7071"),
+ opt[String]("admin-server-port") action { (x, c) =>
+ c.copy(adminServer = c.adminServer.copy(ip = x))
+ } text("Admin server IP. Default: localhost"),
+ opt[String]("accesskey") action { (x, c) =>
+ c.copy(accessKey = c.accessKey.copy(accessKey = x))
+ } text("Access key of the App where feedback data will be stored."),
+ opt[Unit]("uber-jar") action { (x, c) =>
+ c.copy(build = c.build.copy(uberJar = true))
+ },
+ opt[String]("log-url") action { (x, c) =>
+ c.copy(deploy = c.deploy.copy(logUrl = Some(x)))
+ },
+ opt[String]("log-prefix") action { (x, c) =>
+ c.copy(deploy = c.deploy.copy(logPrefix = Some(x)))
+ },
+ opt[String]("json-extractor") action { (x, c) =>
+ c.copy(common = c.common.copy(jsonExtractor = JsonExtractorOption.withName(x)))
+ } validate { x =>
+ if (JsonExtractorOption.values.map(_.toString).contains(x)) {
+ success
+ } else {
+ val validOptions = JsonExtractorOption.values.mkString("|")
+ failure(s"$x is not a valid json-extractor option [$validOptions]")
+ }
+ }
+ )
+ note("")
+ cmd("undeploy").
+ text("Undeploy an engine instance as a prediction server.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "undeploy")
+ } children(
+ opt[String]("ip") action { (x, c) =>
+ c.copy(deploy = c.deploy.copy(ip = x))
+ },
+ opt[Int]("port") action { (x, c) =>
+ c.copy(deploy = c.deploy.copy(port = x))
+ } text("Port to unbind from. Default: 8000")
+ )
+ note("")
+ cmd("dashboard").
+ text("Launch a dashboard at the specific IP and port.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "dashboard")
+ } children(
+ opt[String]("ip") action { (x, c) =>
+ c.copy(dashboard = c.dashboard.copy(ip = x))
+ },
+ opt[Int]("port") action { (x, c) =>
+ c.copy(dashboard = c.dashboard.copy(port = x))
+ } text("Port to bind to. Default: 9000")
+ )
+ note("")
+ cmd("eventserver").
+ text("Launch an Event Server at the specific IP and port.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "eventserver")
+ } children(
+ opt[String]("ip") action { (x, c) =>
+ c.copy(eventServer = c.eventServer.copy(ip = x))
+ },
+ opt[Int]("port") action { (x, c) =>
+ c.copy(eventServer = c.eventServer.copy(port = x))
+ } text("Port to bind to. Default: 7070"),
+ opt[Unit]("stats") action { (x, c) =>
+ c.copy(eventServer = c.eventServer.copy(stats = true))
+ }
+ )
+ cmd("adminserver").
+ text("Launch an Admin Server at the specific IP and port.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "adminserver")
+ } children(
+ opt[String]("ip") action { (x, c) =>
+ c.copy(adminServer = c.adminServer.copy(ip = x))
+ } text("IP to bind to. Default: localhost"),
+ opt[Int]("port") action { (x, c) =>
+ c.copy(adminServer = c.adminServer.copy(port = x))
+ } text("Port to bind to. Default: 7071")
+ )
+ note("")
+ cmd("run").
+ text("Launch a driver program. This command will pass all\n" +
+ "pass-through arguments to its underlying spark-submit command.\n" +
+ "In addition, it also supports a second level of pass-through\n" +
+ "arguments to the driver program, e.g.\n" +
+ "pio run -- --master spark://localhost:7077 -- --driver-arg foo").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "run")
+ } children(
+ arg[String]("<main class>") action { (x, c) =>
+ c.copy(mainClass = Some(x))
+ } text("Main class name of the driver program."),
+ opt[String]("sbt-extra") action { (x, c) =>
+ c.copy(build = c.build.copy(sbtExtra = Some(x)))
+ } text("Extra command to pass to SBT when it builds your engine."),
+ opt[Unit]("clean") action { (x, c) =>
+ c.copy(build = c.build.copy(sbtClean = true))
+ } text("Clean build."),
+ opt[Unit]("no-asm") action { (x, c) =>
+ c.copy(build = c.build.copy(sbtAssemblyPackageDependency = false))
+ } text("Skip building external dependencies assembly.")
+ )
+ note("")
+ cmd("status").
+ text("Displays status information about the PredictionIO system.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "status")
+ }
+ note("")
+ cmd("upgrade").
+ text("Upgrade tool").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "upgrade")
+ } children(
+ arg[String]("<from version>") action { (x, c) =>
+ c.copy(upgrade = c.upgrade.copy(from = x))
+ } text("The version upgraded from."),
+ arg[String]("<to version>") action { (x, c) =>
+ c.copy(upgrade = c.upgrade.copy(to = x))
+ } text("The version upgraded to."),
+ arg[Int]("<old App ID>") action { (x, c) =>
+ c.copy(upgrade = c.upgrade.copy(oldAppId = x))
+ } text("Old App ID."),
+ arg[Int]("<new App ID>") action { (x, c) =>
+ c.copy(upgrade = c.upgrade.copy(newAppId = x))
+ } text("New App ID.")
+ )
+ note("")
+ cmd("app").
+ text("Manage apps.\n").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "app")
+ } children(
+ cmd("new").
+ text("Create a new app key to app ID mapping.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "new")
+ } children(
+ opt[Int]("id") action { (x, c) =>
+ c.copy(app = c.app.copy(id = Some(x)))
+ },
+ opt[String]("description") action { (x, c) =>
+ c.copy(app = c.app.copy(description = Some(x)))
+ },
+ opt[String]("access-key") action { (x, c) =>
+ c.copy(accessKey = c.accessKey.copy(accessKey = x))
+ },
+ arg[String]("<name>") action { (x, c) =>
+ c.copy(app = c.app.copy(name = x))
+ }
+ ),
+ note(""),
+ cmd("list").
+ text("List all apps.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "list")
+ },
+ note(""),
+ cmd("show").
+ text("Show details of an app.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "show")
+ } children (
+ arg[String]("<name>") action { (x, c) =>
+ c.copy(app = c.app.copy(name = x))
+ } text("Name of the app to be shown.")
+ ),
+ note(""),
+ cmd("delete").
+ text("Delete an app.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "delete")
+ } children(
+ arg[String]("<name>") action { (x, c) =>
+ c.copy(app = c.app.copy(name = x))
+ } text("Name of the app to be deleted."),
+ opt[Unit]("force") abbr("f") action { (x, c) =>
+ c.copy(app = c.app.copy(force = true))
+ } text("Delete an app without prompting for confirmation")
+ ),
+ note(""),
+ cmd("data-delete").
+ text("Delete data of an app").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "data-delete")
+ } children(
+ arg[String]("<name>") action { (x, c) =>
+ c.copy(app = c.app.copy(name = x))
+ } text("Name of the app whose data to be deleted."),
+ opt[String]("channel") action { (x, c) =>
+ c.copy(app = c.app.copy(dataDeleteChannel = Some(x)))
+ } text("Name of channel whose data to be deleted."),
+ opt[Unit]("all") action { (x, c) =>
+ c.copy(app = c.app.copy(all = true))
+ } text("Delete data of all channels including default"),
+ opt[Unit]("force") abbr("f") action { (x, c) =>
+ c.copy(app = c.app.copy(force = true))
+ } text("Delete data of an app without prompting for confirmation")
+ ),
+ note(""),
+ cmd("channel-new").
+ text("Create a new channel for the app.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "channel-new")
+ } children (
+ arg[String]("<name>") action { (x, c) =>
+ c.copy(app = c.app.copy(name = x))
+ } text("App name."),
+ arg[String]("<channel>") action { (x, c) =>
+ c.copy(app = c.app.copy(channel = x))
+ } text ("Channel name to be created.")
+ ),
+ note(""),
+ cmd("channel-delete").
+ text("Delete a channel of the app.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "channel-delete")
+ } children (
+ arg[String]("<name>") action { (x, c) =>
+ c.copy(app = c.app.copy(name = x))
+ } text("App name."),
+ arg[String]("<channel>") action { (x, c) =>
+ c.copy(app = c.app.copy(channel = x))
+ } text ("Channel name to be deleted."),
+ opt[Unit]("force") abbr("f") action { (x, c) =>
+ c.copy(app = c.app.copy(force = true))
+ } text("Delete a channel of the app without prompting for confirmation")
+ )
+ )
+ note("")
+ cmd("accesskey").
+ text("Manage app access keys.\n").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "accesskey")
+ } children(
+ cmd("new").
+ text("Add allowed event(s) to an access key.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "new")
+ } children(
+ opt[String]("key") action { (x, c) =>
+ c.copy(accessKey = c.accessKey.copy(accessKey = x))
+ },
+ arg[String]("<app name>") action { (x, c) =>
+ c.copy(app = c.app.copy(name = x))
+ },
+ arg[String]("[<event1> <event2> ...]") unbounded() optional()
+ action { (x, c) =>
+ c.copy(accessKey = c.accessKey.copy(
+ events = c.accessKey.events :+ x))
+ }
+ ),
+ cmd("list").
+ text("List all access keys of an app.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "list")
+ } children(
+ arg[String]("<app name>") optional() action { (x, c) =>
+ c.copy(app = c.app.copy(name = x))
+ } text("App name.")
+ ),
+ note(""),
+ cmd("delete").
+ text("Delete an access key.").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "delete")
+ } children(
+ arg[String]("<access key>") action { (x, c) =>
+ c.copy(accessKey = c.accessKey.copy(accessKey = x))
+ } text("The access key to be deleted.")
+ )
+ )
+ cmd("template").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "template")
+ } children(
+ cmd("get").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "get")
+ } children(
+ arg[String]("<template ID>") required() action { (x, c) =>
+ c.copy(template = c.template.copy(repository = x))
+ },
+ arg[String]("<new engine directory>") action { (x, c) =>
+ c.copy(template = c.template.copy(directory = x))
+ },
+ opt[String]("version") action { (x, c) =>
+ c.copy(template = c.template.copy(version = Some(x)))
+ },
+ opt[String]("name") action { (x, c) =>
+ c.copy(template = c.template.copy(name = Some(x)))
+ },
+ opt[String]("package") action { (x, c) =>
+ c.copy(template = c.template.copy(packageName = Some(x)))
+ },
+ opt[String]("email") action { (x, c) =>
+ c.copy(template = c.template.copy(email = Some(x)))
+ }
+ ),
+ cmd("list").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "list")
+ }
+ )
+ cmd("export").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "export")
+ } children(
+ opt[Int]("appid") required() action { (x, c) =>
+ c.copy(export = c.export.copy(appId = x))
+ },
+ opt[String]("output") required() action { (x, c) =>
+ c.copy(export = c.export.copy(outputPath = x))
+ },
+ opt[String]("format") action { (x, c) =>
+ c.copy(export = c.export.copy(format = x))
+ },
+ opt[String]("channel") action { (x, c) =>
+ c.copy(export = c.export.copy(channel = Some(x)))
+ }
+ )
+ cmd("import").
+ action { (_, c) =>
+ c.copy(commands = c.commands :+ "import")
+ } children(
+ opt[Int]("appid") required() action { (x, c) =>
+ c.copy(imprt = c.imprt.copy(appId = x))
+ },
+ opt[String]("input") required() action { (x, c) =>
+ c.copy(imprt = c.imprt.copy(inputPath = x))
+ },
+ opt[String]("channel") action { (x, c) =>
+ c.copy(imprt = c.imprt.copy(channel = Some(x)))
+ }
+ )
+ }
+
+ val separatorIndex = args.indexWhere(_ == "--")
+ val (consoleArgs, theRest) =
+ if (separatorIndex == -1) {
+ (args, Array[String]())
+ } else {
+ args.splitAt(separatorIndex)
+ }
+ val allPassThroughArgs = theRest.drop(1)
+ val secondSepIdx = allPassThroughArgs.indexWhere(_ == "--")
+ val (sparkPassThroughArgs, driverPassThroughArgs) =
+ if (secondSepIdx == -1) {
+ (allPassThroughArgs, Array[String]())
+ } else {
+ val t = allPassThroughArgs.splitAt(secondSepIdx)
+ (t._1, t._2.drop(1))
+ }
+
+ parser.parse(consoleArgs, ConsoleArgs()) map { pca =>
+ val ca = pca.copy(common = pca.common.copy(
+ sparkPassThrough = sparkPassThroughArgs,
+ driverPassThrough = driverPassThroughArgs))
+ WorkflowUtils.modifyLogging(ca.common.verbose)
+ val rv: Int = ca.commands match {
+ case Seq("") =>
+ System.err.println(help())
+ 1
+ case Seq("version") =>
+ version(ca)
+ 0
+ case Seq("build") =>
+ regenerateManifestJson(ca.common.manifestJson)
+ build(ca)
+ case Seq("unregister") =>
+ unregister(ca)
+ 0
+ case Seq("train") =>
+ regenerateManifestJson(ca.common.manifestJson)
+ train(ca)
+ case Seq("eval") =>
+ regenerateManifestJson(ca.common.manifestJson)
+ train(ca)
+ case Seq("deploy") =>
+ deploy(ca)
+ case Seq("undeploy") =>
+ undeploy(ca)
+ case Seq("dashboard") =>
+ dashboard(ca)
+ 0
+ case Seq("eventserver") =>
+ eventserver(ca)
+ 0
+ case Seq("adminserver") =>
+ adminserver(ca)
+ 0
+ case Seq("run") =>
+ generateManifestJson(ca.common.manifestJson)
+ run(ca)
+ case Seq("status") =>
+ status(ca)
+ case Seq("upgrade") =>
+ upgrade(ca)
+ 0
+ case Seq("app", "new") =>
+ App.create(ca)
+ case Seq("app", "list") =>
+ App.list(ca)
+ case Seq("app", "show") =>
+ App.show(ca)
+ case Seq("app", "delete") =>
+ App.delete(ca)
+ case Seq("app", "data-delete") =>
+ App.dataDelete(ca)
+ case Seq("app", "channel-new") =>
+ App.channelNew(ca)
+ case Seq("app", "channel-delete") =>
+ App.channelDelete(ca)
+ case Seq("accesskey", "new") =>
+ AccessKey.create(ca)
+ case Seq("accesskey", "list") =>
+ AccessKey.list(ca)
+ case Seq("accesskey", "delete") =>
+ AccessKey.delete(ca)
+ case Seq("template", "get") =>
+ Template.get(ca)
+ case Seq("template", "list") =>
+ Template.list(ca)
+ case Seq("export") =>
+ Export.eventsToFile(ca)
+ case Seq("import") =>
+ Import.fileToEvents(ca)
+ case _ =>
+ System.err.println(help(ca.commands))
+ 1
+ }
+ sys.exit(rv)
+ } getOrElse {
+ val command = args.toSeq.filterNot(_.startsWith("--")).head
+ System.err.println(help(Seq(command)))
+ sys.exit(1)
+ }
+ }
+
+ def help(commands: Seq[String] = Seq()): String = {
+ if (commands.isEmpty) {
+ mainHelp
+ } else {
+ val stripped =
+ (if (commands.head == "help") commands.drop(1) else commands).
+ mkString("-")
+ helpText.getOrElse(stripped, s"Help is unavailable for ${stripped}.")
+ }
+ }
+
+ val mainHelp = txt.main().toString
+
+ val helpText = Map(
+ "" -> mainHelp,
+ "status" -> txt.status().toString,
+ "upgrade" -> txt.upgrade().toString,
+ "version" -> txt.version().toString,
+ "template" -> txt.template().toString,
+ "build" -> txt.build().toString,
+ "train" -> txt.train().toString,
+ "deploy" -> txt.deploy().toString,
+ "eventserver" -> txt.eventserver().toString,
+ "adminserver" -> txt.adminserver().toString,
+ "app" -> txt.app().toString,
+ "accesskey" -> txt.accesskey().toString,
+ "import" -> txt.imprt().toString,
+ "export" -> txt.export().toString,
+ "run" -> txt.run().toString,
+ "eval" -> txt.eval().toString,
+ "dashboard" -> txt.dashboard().toString)
+
+ def version(ca: ConsoleArgs): Unit = println(BuildInfo.version)
+
+ def build(ca: ConsoleArgs): Int = {
+ Template.verifyTemplateMinVersion(new File("template.json"))
+ compile(ca)
+ info("Looking for an engine...")
+ val jarFiles = jarFilesForScala
+ if (jarFiles.isEmpty) {
+ error("No engine found. Your build might have failed. Aborting.")
+ return 1
+ }
+ jarFiles foreach { f => info(s"Found ${f.getName}")}
+ RegisterEngine.registerEngine(
+ ca.common.manifestJson,
+ jarFiles,
+ false)
+ info("Your engine is ready for training.")
+ 0
+ }
+
+ def unregister(ca: ConsoleArgs): Unit = {
+ RegisterEngine.unregisterEngine(ca.common.manifestJson)
+ }
+
+ def train(ca: ConsoleArgs): Int = {
+ Template.verifyTemplateMinVersion(new File("template.json"))
+ withRegisteredManifest(
+ ca.common.manifestJson,
+ ca.common.engineId,
+ ca.common.engineVersion) { em =>
+ RunWorkflow.newRunWorkflow(ca, em)
+ }
+ }
+
+ def deploy(ca: ConsoleArgs): Int = {
+ Template.verifyTemplateMinVersion(new File("template.json"))
+ withRegisteredManifest(
+ ca.common.manifestJson,
+ ca.common.engineId,
+ ca.common.engineVersion) { em =>
+ val variantJson = parse(Source.fromFile(ca.common.variantJson).mkString)
+ val variantId = variantJson \ "id" match {
+ case JString(s) => s
+ case _ =>
+ error("Unable to read engine variant ID from " +
+ s"${ca.common.variantJson.getCanonicalPath}. Aborting.")
+ return 1
+ }
+ val engineInstances = storage.Storage.getMetaDataEngineInstances
+ val engineInstance = ca.engineInstanceId map { eid =>
+ engineInstances.get(eid)
+ } getOrElse {
+ engineInstances.getLatestCompleted(em.id, em.version, variantId)
+ }
+ engineInstance map { r =>
+ RunServer.newRunServer(ca, em, r.id)
+ } getOrElse {
+ ca.engineInstanceId map { eid =>
+ error(
+ s"Invalid engine instance ID ${ca.engineInstanceId}. Aborting.")
+ } getOrElse {
+ error(
+ s"No valid engine instance found for engine ${em.id} " +
+ s"${em.version}.\nTry running 'train' before 'deploy'. Aborting.")
+ }
+ 1
+ }
+ }
+ }
+
+ def dashboard(ca: ConsoleArgs): Unit = {
+ info(s"Creating dashboard at ${ca.dashboard.ip}:${ca.dashboard.port}")
+ Dashboard.createDashboard(DashboardConfig(
+ ip = ca.dashboard.ip,
+ port = ca.dashboard.port))
+ }
+
+ def eventserver(ca: ConsoleArgs): Unit = {
+ info(
+ s"Creating Event Server at ${ca.eventServer.ip}:${ca.eventServer.port}")
+ EventServer.createEventServer(EventServerConfig(
+ ip = ca.eventServer.ip,
+ port = ca.eventServer.port,
+ stats = ca.eventServer.stats))
+ }
+
+ def adminserver(ca: ConsoleArgs): Unit = {
+ info(
+ s"Creating Admin Server at ${ca.adminServer.ip}:${ca.adminServer.port}")
+ AdminServer.createAdminServer(AdminServerConfig(
+ ip = ca.adminServer.ip,
+ port = ca.adminServer.port
+ ))
+ }
+
+ def undeploy(ca: ConsoleArgs): Int = {
+ val serverUrl = s"http://${ca.deploy.ip}:${ca.deploy.port}"
+ info(
+ s"Undeploying any existing engine instance at ${serverUrl}")
+ try {
+ val code = Http(s"${serverUrl}/stop").asString.code
+ code match {
+ case 200 => 0
+ case 404 =>
+ error(s"Another process is using ${serverUrl}. Unable to undeploy.")
+ 1
+ case _ =>
+ error(s"Another process is using ${serverUrl}, or an existing " +
+ s"engine server is not responding properly (HTTP ${code}). " +
+ "Unable to undeploy.")
+ 1
+ }
+ } catch {
+ case e: java.net.ConnectException =>
+ warn(s"Nothing at ${serverUrl}")
+ 0
+ case _: Throwable =>
+ error("Another process might be occupying " +
+ s"${ca.deploy.ip}:${ca.deploy.port}. Unable to undeploy.")
+ 1
+ }
+ }
+
+ def compile(ca: ConsoleArgs): Unit = {
+ // only add pioVersion to sbt if project/pio.sbt exists
+ if (new File("project", "pio-build.sbt").exists || ca.build.forceGeneratePIOSbt) {
+ FileUtils.writeLines(
+ new File("pio.sbt"),
+ Seq(
+ "// Generated automatically by pio build.",
+ "// Changes in this file will be overridden.",
+ "",
+ "pioVersion := \"" + BuildInfo.version + "\""))
+ }
+ implicit val formats = Utils.json4sDefaultFormats
+ try {
+ val engineFactory =
+ (parse(Source.fromFile("engine.json").mkString) \ "engineFactory").
+ extract[String]
+ WorkflowUtils.checkUpgrade("build", engineFactory)
+ } catch {
+ case e: Throwable => WorkflowUtils.checkUpgrade("build")
+ }
+ val sbt = detectSbt(ca)
+ info(s"Using command '${sbt}' at the current working directory to build.")
+ info("If the path above is incorrect, this process will fail.")
+ val asm =
+ if (ca.build.sbtAssemblyPackageDependency) {
+ " assemblyPackageDependency"
+ } else {
+ ""
+ }
+ val clean = if (ca.build.sbtClean) " clean" else ""
+ val buildCmd = s"${sbt} ${ca.build.sbtExtra.getOrElse("")}${clean} " +
+ (if (ca.build.uberJar) "assembly" else s"package${asm}")
+ val core = new File(s"pio-assembly-${BuildInfo.version}.jar")
+ if (ca.build.uberJar) {
+ info(s"Uber JAR enabled. Putting ${core.getName} in lib.")
+ val dst = new File("lib")
+ dst.mkdir()
+ FileUtils.copyFileToDirectory(
+ coreAssembly(ca.common.pioHome.get),
+ dst,
+ true)
+ } else {
+ if (new File("engine.json").exists()) {
+ info(s"Uber JAR disabled. Making sure lib/${core.getName} is absent.")
+ new File("lib", core.getName).delete()
+ } else {
+ info("Uber JAR disabled, but current working directory does not look " +
+ s"like an engine project directory. Please delete lib/${core.getName} manually.")
+ }
+ }
+ info(s"Going to run: ${buildCmd}")
+ try {
+ val r =
+ if (ca.common.verbose) {
+ buildCmd.!(ProcessLogger(line => info(line), line => error(line)))
+ } else {
+ buildCmd.!(ProcessLogger(
+ line => outputSbtError(line),
+ line => outputSbtError(line)))
+ }
+ if (r != 0) {
+ error(s"Return code of previous step is ${r}. Aborting.")
+ sys.exit(1)
+ }
+ info("Build finished successfully.")
+ } catch {
+ case e: java.io.IOException =>
+ error(s"${e.getMessage}")
+ sys.exit(1)
+ }
+ }
+
+ private def outputSbtError(line: String): Unit = {
+ """\[.*error.*\]""".r findFirstIn line foreach { _ => error(line) }
+ }
+
+ def run(ca: ConsoleArgs): Int = {
+ compile(ca)
+
+ val extraFiles = WorkflowUtils.thirdPartyConfFiles
+
+ val jarFiles = jarFilesForScala
+ jarFiles foreach { f => info(s"Found JAR: ${f.getName}") }
+ val allJarFiles = jarFiles.map(_.getCanonicalPath)
+ val cmd = s"${getSparkHome(ca.common.sparkHome)}/bin/spark-submit --jars " +
+ s"${allJarFiles.mkString(",")} " +
+ (if (extraFiles.size > 0) {
+ s"--files ${extraFiles.mkString(",")} "
+ } else {
+ ""
+ }) +
+ "--class " +
+ s"${ca.mainClass.get} ${ca.common.sparkPassThrough.mkString(" ")} " +
+ coreAssembly(ca.common.pioHome.get) + " " +
+ ca.common.driverPassThrough.mkString(" ")
+ val proc = Process(
+ cmd,
+ None,
+ "SPARK_YARN_USER_ENV" -> sys.env.filter(kv => kv._1.startsWith("PIO_")).
+ map(kv => s"${kv._1}=${kv._2}").mkString(","))
+ info(s"Submission command: ${cmd}")
+ val r = proc.!
+ if (r != 0) {
+ error(s"Return code of previous step is ${r}. Aborting.")
+ return 1
+ }
+ r
+ }
+
+ def status(ca: ConsoleArgs): Int = {
+ info("Inspecting PredictionIO...")
+ ca.common.pioHome map { pioHome =>
+ info(s"PredictionIO ${BuildInfo.version} is installed at $pioHome")
+ } getOrElse {
+ error("Unable to locate PredictionIO installation. Aborting.")
+ return 1
+ }
+ info("Inspecting Apache Spark...")
+ val sparkHome = getSparkHome(ca.common.sparkHome)
+ if (new File(s"$sparkHome/bin/spark-submit").exists) {
+ info(s"Apache Spark is installed at $sparkHome")
+ val sparkMinVersion = "1.3.0"
+ val sparkReleaseFile = new File(s"$sparkHome/RELEASE")
+ if (sparkReleaseFile.exists) {
+ val sparkReleaseStrings =
+ Source.fromFile(sparkReleaseFile).mkString.split(' ')
+ if (sparkReleaseStrings.length < 2) {
+ warn(stripMarginAndNewlines(
+ s"""|Apache Spark version information cannot be found (RELEASE file
+ |is empty). This is a known issue for certain vendors (e.g.
+ |Cloudera). Please make sure you are using a version of at least
+ |$sparkMinVersion."""))
+ } else {
+ val sparkReleaseVersion = sparkReleaseStrings(1)
+ val parsedMinVersion = Version.apply(sparkMinVersion)
+ val parsedCurrentVersion = Version.apply(sparkReleaseVersion)
+ if (parsedCurrentVersion >= parsedMinVersion) {
+ info(stripMarginAndNewlines(
+ s"""|Apache Spark $sparkReleaseVersion detected (meets minimum
+ |requirement of $sparkMinVersion)"""))
+ } else {
+ error(stripMarginAndNewlines(
+ s"""|Apache Spark $sparkReleaseVersion detected (does not meet
+ |minimum requirement. Aborting."""))
+ }
+ }
+ } else {
+ warn(stripMarginAndNewlines(
+ s"""|Apache Spark version information cannot be found. If you are
+ |using a developmental tree, please make sure you are using a
+ |version of at least $sparkMinVersion."""))
+ }
+ } else {
+ error("Unable to locate a proper Apache Spark installation. Aborting.")
+ return 1
+ }
+ info("Inspecting storage backend connections...")
+ try {
+ storage.Storage.verifyAllDataObjects()
+ } catch {
+ case e: Throwable =>
+ error("Unable to connect to all storage backends successfully. The " +
+ "following shows the error message from the storage backend.")
+ error(s"${e.getMessage} (${e.getClass.getName})", e)
+ error("Dumping configuration of initialized storage backend sources. " +
+ "Please make sure they are correct.")
+ storage.Storage.config.get("sources") map { src =>
+ src foreach { case (s, p) =>
+ error(s"Source Name: $s; Type: ${p.getOrElse("type", "(error)")}; " +
+ s"Configuration: ${p.getOrElse("config", "(error)")}")
+ }
+ } getOrElse {
+ error("No properly configured storage backend sources.")
+ }
+ return 1
+ }
+ info("(sleeping 5 seconds for all messages to show up...)")
+ Thread.sleep(5000)
+ info("Your system is all ready to go.")
+ 0
+ }
+
+ def upgrade(ca: ConsoleArgs): Unit = {
+ (ca.upgrade.from, ca.upgrade.to) match {
+ case ("0.8.2", "0.8.3") => {
+ Upgrade_0_8_3.runMain(ca.upgrade.oldAppId, ca.upgrade.newAppId)
+ }
+ case _ =>
+ println(s"Upgrade from version ${ca.upgrade.from} to ${ca.upgrade.to}"
+ + s" is not supported.")
+ }
+ }
+
+ def coreAssembly(pioHome: String): File = {
+ val core = s"pio-assembly-${BuildInfo.version}.jar"
+ val coreDir =
+ if (new File(pioHome + File.separator + "RELEASE").exists) {
+ new File(pioHome + File.separator + "lib")
+ } else {
+ new File(pioHome + File.separator + "assembly")
+ }
+ val coreFile = new File(coreDir, core)
+ if (coreFile.exists) {
+ coreFile
+ } else {
+ error(s"PredictionIO Core Assembly (${coreFile.getCanonicalPath}) does " +
+ "not exist. Aborting.")
+ sys.exit(1)
+ }
+ }
+
+ val manifestAutogenTag = "pio-autogen-manifest"
+
+ def regenerateManifestJson(json: File): Unit = {
+ val cwd = sys.props("user.dir")
+ val ha = java.security.MessageDigest.getInstance("SHA-1").
+ digest(cwd.getBytes).map("%02x".format(_)).mkString
+ if (json.exists) {
+ val em = readManifestJson(json)
+ if (em.description == Some(manifestAutogenTag) && ha != em.version) {
+ warn("This engine project directory contains an auto-generated " +
+ "manifest that has been copied/moved from another location. ")
+ warn("Regenerating the manifest to reflect the updated location. " +
+ "This will dissociate with all previous engine instances.")
+ generateManifestJson(json)
+ } else {
+ info(s"Using existing engine manifest JSON at ${json.getCanonicalPath}")
+ }
+ } else {
+ generateManifestJson(json)
+ }
+ }
+
+ def generateManifestJson(json: File): Unit = {
+ val cwd = sys.props("user.dir")
+ implicit val formats = Utils.json4sDefaultFormats +
+ new EngineManifestSerializer
+ val rand = Random.alphanumeric.take(32).mkString
+ val ha = java.security.MessageDigest.getInstance("SHA-1").
+ digest(cwd.getBytes).map("%02x".format(_)).mkString
+ val em = EngineManifest(
+ id = rand,
+ version = ha,
+ name = new File(cwd).getName,
+ description = Some(manifestAutogenTag),
+ files = Seq(),
+ engineFactory = "")
+ try {
+ FileUtils.writeStringToFile(json, write(em), "ISO-8859-1")
+ } catch {
+ case e: java.io.IOException =>
+ error(s"Cannot generate ${json} automatically (${e.getMessage}). " +
+ "Aborting.")
+ sys.exit(1)
+ }
+ }
+
+ def readManifestJson(json: File): EngineManifest = {
+ implicit val formats = Utils.json4sDefaultFormats +
+ new EngineManifestSerializer
+ try {
+ read[EngineManifest](Source.fromFile(json).mkString)
+ } catch {
+ case e: java.io.FileNotFoundException =>
+ error(s"${json.getCanonicalPath} does not exist. Aborting.")
+ sys.exit(1)
+ case e: MappingException =>
+ error(s"${json.getCanonicalPath} has invalid content: " +
+ e.getMessage)
+ sys.exit(1)
+ }
+ }
+
+ def withRegisteredManifest(
+ json: File,
+ engineId: Option[String],
+ engineVersion: Option[String])(
+ op: EngineManifest => Int): Int = {
+ val ej = readManifestJson(json)
+ val id = engineId getOrElse ej.id
+ val version = engineVersion getOrElse ej.version
+ storage.Storage.getMetaDataEngineManifests.get(id, version) map {
+ op
+ } getOrElse {
+ error(s"Engine ${id} ${version} cannot be found in the system.")
+ error("Possible reasons:")
+ error("- the engine is not yet built by the 'build' command;")
+ error("- the meta data store is offline.")
+ 1
+ }
+ }
+
+ def jarFilesAt(path: File): Array[File] = recursiveListFiles(path) filter {
+ _.getName.toLowerCase.endsWith(".jar")
+ }
+
+ def jarFilesForScala: Array[File] = {
+ val libFiles = jarFilesForScalaFilter(jarFilesAt(new File("lib")))
+ val targetFiles = jarFilesForScalaFilter(jarFilesAt(new File("target" +
+ File.separator + s"scala-${scalaVersionNoPatch}")))
+ // Use libFiles is target is empty.
+ if (targetFiles.size > 0) targetFiles else libFiles
+ }
+
+ def jarFilesForScalaFilter(jars: Array[File]): Array[File] =
+ jars.filterNot { f =>
+ f.getName.toLowerCase.endsWith("-javadoc.jar") ||
+ f.getName.toLowerCase.endsWith("-sources.jar")
+ }
+
+ def recursiveListFiles(f: File): Array[File] = {
+ Option(f.listFiles) map { these =>
+ these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles)
+ } getOrElse Array[File]()
+ }
+
+ def getSparkHome(sparkHome: Option[String]): String = {
+ sparkHome getOrElse {
+ sys.env.getOrElse("SPARK_HOME", ".")
+ }
+ }
+
+ def versionNoPatch(fullVersion: String): String = {
+ val v = """^(\d+\.\d+)""".r
+ val versionNoPatch = for {
+ v(np) <- v findFirstIn fullVersion
+ } yield np
+ versionNoPatch.getOrElse(fullVersion)
+ }
+
+ def scalaVersionNoPatch: String = versionNoPatch(BuildInfo.scalaVersion)
+
+ def detectSbt(ca: ConsoleArgs): String = {
+ ca.build.sbt map {
+ _.getCanonicalPath
+ } getOrElse {
+ val f = new File(Seq(ca.common.pioHome.get, "sbt", "sbt").mkString(
+ File.separator))
+ if (f.exists) f.getCanonicalPath else "sbt"
+ }
+ }
+
+ def stripMarginAndNewlines(string: String): String =
+ string.stripMargin.replaceAll("\n", " ")
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/console/Export.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Export.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Export.scala
new file mode 100644
index 0000000..7c0dfa4
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Export.scala
@@ -0,0 +1,42 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.console
+
+import org.apache.predictionio.tools.Runner
+
+case class ExportArgs(
+ appId: Int = 0,
+ channel: Option[String] = None,
+ outputPath: String = "",
+ format: String = "json")
+
+object Export {
+ def eventsToFile(ca: ConsoleArgs): Int = {
+ val channelArg = ca.export.channel
+ .map(ch => Seq("--channel", ch)).getOrElse(Nil)
+ Runner.runOnSpark(
+ "org.apache.predictionio.tools.export.EventsToFile",
+ Seq(
+ "--appid",
+ ca.export.appId.toString,
+ "--output",
+ ca.export.outputPath,
+ "--format",
+ ca.export.format) ++ channelArg,
+ ca,
+ Nil)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/console/Import.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Import.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Import.scala
new file mode 100644
index 0000000..185aefb
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Import.scala
@@ -0,0 +1,39 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.console
+
+import org.apache.predictionio.tools.Runner
+
+case class ImportArgs(
+ appId: Int = 0,
+ channel: Option[String] = None,
+ inputPath: String = "")
+
+object Import {
+ def fileToEvents(ca: ConsoleArgs): Int = {
+ val channelArg = ca.imprt.channel
+ .map(ch => Seq("--channel", ch)).getOrElse(Nil)
+ Runner.runOnSpark(
+ "org.apache.predictionio.tools.imprt.FileToEvents",
+ Seq(
+ "--appid",
+ ca.imprt.appId.toString,
+ "--input",
+ ca.imprt.inputPath) ++ channelArg,
+ ca,
+ Nil)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/console/Template.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Template.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Template.scala
new file mode 100644
index 0000000..f47cacf
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Template.scala
@@ -0,0 +1,429 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.console
+
+import java.io.BufferedInputStream
+import java.io.BufferedOutputStream
+import java.io.File
+import java.io.FileInputStream
+import java.io.FileOutputStream
+import java.net.ConnectException
+import java.net.URI
+import java.util.zip.ZipInputStream
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.controller.Utils
+import org.apache.predictionio.core.BuildInfo
+import org.apache.commons.io.FileUtils
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+import semverfi._
+
+import scala.io.Source
+import scala.sys.process._
+import scalaj.http._
+
+case class TemplateArgs(
+ directory: String = "",
+ repository: String = "",
+ version: Option[String] = None,
+ name: Option[String] = None,
+ packageName: Option[String] = None,
+ email: Option[String] = None)
+
+case class GitHubTag(
+ name: String,
+ zipball_url: String,
+ tarball_url: String,
+ commit: GitHubCommit)
+
+case class GitHubCommit(
+ sha: String,
+ url: String)
+
+case class GitHubCache(
+ headers: Map[String, String],
+ body: String)
+
+case class TemplateEntry(
+ repo: String)
+
+case class TemplateMetaData(
+ pioVersionMin: Option[String] = None)
+
+object Template extends Logging {
+ implicit val formats = Utils.json4sDefaultFormats
+
+ def templateMetaData(templateJson: File): TemplateMetaData = {
+ if (!templateJson.exists) {
+ warn(s"$templateJson does not exist. Template metadata will not be available. " +
+ "(This is safe to ignore if you are not working on a template.)")
+ TemplateMetaData()
+ } else {
+ val jsonString = Source.fromFile(templateJson)(scala.io.Codec.ISO8859).mkString
+ val json = try {
+ parse(jsonString)
+ } catch {
+ case e: org.json4s.ParserUtil.ParseException =>
+ warn(s"$templateJson cannot be parsed. Template metadata will not be available.")
+ return TemplateMetaData()
+ }
+ val pioVersionMin = json \ "pio" \ "version" \ "min"
+ pioVersionMin match {
+ case JString(s) => TemplateMetaData(pioVersionMin = Some(s))
+ case _ => TemplateMetaData()
+ }
+ }
+ }
+
+ /** Creates a wrapper that provides the functionality of scalaj.http.Http()
+ * with automatic proxy settings handling. The proxy settings will first
+ * come from "git" followed by system properties "http.proxyHost" and
+ * "http.proxyPort".
+ *
+ * @param url URL to be connected
+ * @return
+ */
+ def httpOptionalProxy(url: String): HttpRequest = {
+ val gitProxy = try {
+ Some(Process("git config --global http.proxy").lines.toList(0))
+ } catch {
+ case e: Throwable => None
+ }
+
+ val (host, port) = gitProxy map { p =>
+ val proxyUri = new URI(p)
+ (Option(proxyUri.getHost),
+ if (proxyUri.getPort == -1) None else Some(proxyUri.getPort))
+ } getOrElse {
+ (sys.props.get("http.proxyHost"),
+ sys.props.get("http.proxyPort").map { p =>
+ try {
+ Some(p.toInt)
+ } catch {
+ case e: NumberFormatException => None
+ }
+ } getOrElse None)
+ }
+
+ (host, port) match {
+ case (Some(h), Some(p)) => Http(url).proxy(h, p)
+ case _ => Http(url)
+ }
+ }
+
+ def getGitHubRepos(
+ repos: Seq[String],
+ apiType: String,
+ repoFilename: String): Map[String, GitHubCache] = {
+ val reposCache = try {
+ val cache =
+ Source.fromFile(repoFilename)(scala.io.Codec.ISO8859).mkString
+ read[Map[String, GitHubCache]](cache)
+ } catch {
+ case e: Throwable => Map[String, GitHubCache]()
+ }
+ val newReposCache = reposCache ++ (try {
+ repos.map { repo =>
+ val url = s"https://api.github.com/repos/$repo/$apiType"
+ val http = httpOptionalProxy(url)
+ val response = reposCache.get(repo).map { cache =>
+ cache.headers.get("ETag").map { etag =>
+ http.header("If-None-Match", etag).asString
+ } getOrElse {
+ http.asString
+ }
+ } getOrElse {
+ http.asString
+ }
+
+ val body = if (response.code == 304) {
+ reposCache(repo).body
+ } else {
+ response.body
+ }
+
+ repo -> GitHubCache(headers = response.headers, body = body)
+ }.toMap
+ } catch {
+ case e: ConnectException =>
+ githubConnectErrorMessage(e)
+ Map()
+ })
+ FileUtils.writeStringToFile(
+ new File(repoFilename),
+ write(newReposCache),
+ "ISO-8859-1")
+ newReposCache
+ }
+
+ def sub(repo: String, name: String, email: String, org: String): Unit = {
+ val data = Map(
+ "repo" -> repo,
+ "name" -> name,
+ "email" -> email,
+ "org" -> org)
+ try {
+ httpOptionalProxy("https://update.prediction.io/templates.subscribe").
+ postData("json=" + write(data)).asString
+ } catch {
+ case e: Throwable => error("Unable to subscribe.")
+ }
+ }
+
+ def meta(repo: String, name: String, org: String): Unit = {
+ try {
+ httpOptionalProxy(
+ s"https://meta.prediction.io/templates/$repo/$org/$name").asString
+ } catch {
+ case e: Throwable => debug("Template metadata unavailable.")
+ }
+ }
+
+ def list(ca: ConsoleArgs): Int = {
+ val templatesUrl = "https://templates.prediction.io/index.json"
+ try {
+ val templatesJson = Source.fromURL(templatesUrl).mkString("")
+ val templates = read[List[TemplateEntry]](templatesJson)
+ println("The following is a list of template IDs registered on " +
+ "PredictionIO Template Gallery:")
+ println()
+ templates.sortBy(_.repo.toLowerCase).foreach { template =>
+ println(template.repo)
+ }
+ println()
+ println("Notice that it is possible use any GitHub repository as your " +
+ "engine template ID (e.g. YourOrg/YourTemplate).")
+ 0
+ } catch {
+ case e: Throwable =>
+ error(s"Unable to list templates from $templatesUrl " +
+ s"(${e.getMessage}). Aborting.")
+ 1
+ }
+ }
+
+ def githubConnectErrorMessage(e: ConnectException): Unit = {
+ error(s"Unable to connect to GitHub (Reason: ${e.getMessage}). " +
+ "Please check your network configuration and proxy settings.")
+ }
+
+ def get(ca: ConsoleArgs): Int = {
+ val repos =
+ getGitHubRepos(Seq(ca.template.repository), "tags", ".templates-cache")
+
+ repos.get(ca.template.repository).map { repo =>
+ try {
+ read[List[GitHubTag]](repo.body)
+ } catch {
+ case e: MappingException =>
+ error(s"Either ${ca.template.repository} is not a valid GitHub " +
+ "repository, or it does not have any tag. Aborting.")
+ return 1
+ }
+ } getOrElse {
+ error(s"Failed to retrieve ${ca.template.repository}. Aborting.")
+ return 1
+ }
+
+ val name = ca.template.name getOrElse {
+ try {
+ Process("git config --global user.name").lines.toList(0)
+ } catch {
+ case e: Throwable =>
+ readLine("Please enter author's name: ")
+ }
+ }
+
+ val organization = ca.template.packageName getOrElse {
+ readLine(
+ "Please enter the template's Scala package name (e.g. com.mycompany): ")
+ }
+
+ val email = ca.template.email getOrElse {
+ try {
+ Process("git config --global user.email").lines.toList(0)
+ } catch {
+ case e: Throwable =>
+ readLine("Please enter author's e-mail address: ")
+ }
+ }
+
+ println(s"Author's name: $name")
+ println(s"Author's e-mail: $email")
+ println(s"Author's organization: $organization")
+
+ var subscribe = readLine("Would you like to be informed about new bug " +
+ "fixes and security updates of this template? (Y/n) ")
+ var valid = false
+
+ do {
+ subscribe match {
+ case "" | "Y" | "y" =>
+ sub(ca.template.repository, name, email, organization)
+ valid = true
+ case "n" | "N" =>
+ meta(ca.template.repository, name, organization)
+ valid = true
+ case _ =>
+ println("Please answer 'y' or 'n'")
+ subscribe = readLine("(Y/n)? ")
+ }
+ } while (!valid)
+
+ val repo = repos(ca.template.repository)
+
+ println(s"Retrieving ${ca.template.repository}")
+ val tags = read[List[GitHubTag]](repo.body)
+ println(s"There are ${tags.size} tags")
+
+ if (tags.size == 0) {
+ println(s"${ca.template.repository} does not have any tag. Aborting.")
+ return 1
+ }
+
+ val tag = ca.template.version.map { v =>
+ tags.find(_.name == v).getOrElse {
+ println(s"${ca.template.repository} does not have tag $v. Aborting.")
+ return 1
+ }
+ } getOrElse tags.head
+
+ println(s"Using tag ${tag.name}")
+ val url =
+ s"https://github.com/${ca.template.repository}/archive/${tag.name}.zip"
+ println(s"Going to download $url")
+ val trial = try {
+ httpOptionalProxy(url).asBytes
+ } catch {
+ case e: ConnectException =>
+ githubConnectErrorMessage(e)
+ return 1
+ }
+ val finalTrial = try {
+ trial.location.map { loc =>
+ println(s"Redirecting to $loc")
+ httpOptionalProxy(loc).asBytes
+ } getOrElse trial
+ } catch {
+ case e: ConnectException =>
+ githubConnectErrorMessage(e)
+ return 1
+ }
+ val zipFilename =
+ s"${ca.template.repository.replace('/', '-')}-${tag.name}.zip"
+ FileUtils.writeByteArrayToFile(
+ new File(zipFilename),
+ finalTrial.body)
+ val zis = new ZipInputStream(
+ new BufferedInputStream(new FileInputStream(zipFilename)))
+ val bufferSize = 4096
+ val filesToModify = collection.mutable.ListBuffer[String]()
+ var ze = zis.getNextEntry
+ while (ze != null) {
+ val filenameSegments = ze.getName.split(File.separatorChar)
+ val destFilename = (ca.template.directory +: filenameSegments.tail).
+ mkString(File.separator)
+ if (ze.isDirectory) {
+ new File(destFilename).mkdirs
+ } else {
+ val os = new BufferedOutputStream(
+ new FileOutputStream(destFilename),
+ bufferSize)
+ val data = Array.ofDim[Byte](bufferSize)
+ var count = zis.read(data, 0, bufferSize)
+ while (count != -1) {
+ os.write(data, 0, count)
+ count = zis.read(data, 0, bufferSize)
+ }
+ os.flush()
+ os.close()
+
+ val nameOnly = new File(destFilename).getName
+
+ if (organization != "" &&
+ (nameOnly.endsWith(".scala") ||
+ nameOnly == "build.sbt" ||
+ nameOnly == "engine.json")) {
+ filesToModify += destFilename
+ }
+ }
+ ze = zis.getNextEntry
+ }
+ zis.close()
+ new File(zipFilename).delete
+
+ val engineJsonFile =
+ new File(ca.template.directory, "engine.json")
+
+ val engineJson = try {
+ Some(parse(Source.fromFile(engineJsonFile).mkString))
+ } catch {
+ case e: java.io.IOException =>
+ error("Unable to read engine.json. Skipping automatic package " +
+ "name replacement.")
+ None
+ case e: MappingException =>
+ error("Unable to parse engine.json. Skipping automatic package " +
+ "name replacement.")
+ None
+ }
+
+ val engineFactory = engineJson.map { ej =>
+ (ej \ "engineFactory").extractOpt[String]
+ } getOrElse None
+
+ engineFactory.map { ef =>
+ val pkgName = ef.split('.').dropRight(1).mkString(".")
+ println(s"Replacing $pkgName with $organization...")
+
+ filesToModify.foreach { ftm =>
+ println(s"Processing $ftm...")
+ val fileContent = Source.fromFile(ftm).getLines()
+ val processedLines =
+ fileContent.map(_.replaceAllLiterally(pkgName, organization))
+ FileUtils.writeStringToFile(
+ new File(ftm),
+ processedLines.mkString("\n"))
+ }
+ } getOrElse {
+ error("engineFactory is not found in engine.json. Skipping automatic " +
+ "package name replacement.")
+ }
+
+ verifyTemplateMinVersion(new File(ca.template.directory, "template.json"))
+
+ println(s"Engine template ${ca.template.repository} is now ready at " +
+ ca.template.directory)
+
+ 0
+ }
+
+ def verifyTemplateMinVersion(templateJsonFile: File): Unit = {
+ val metadata = templateMetaData(templateJsonFile)
+
+ metadata.pioVersionMin.foreach { pvm =>
+ if (Version(BuildInfo.version) < Version(pvm)) {
+ error(s"This engine template requires at least PredictionIO $pvm. " +
+ s"The template may not work with PredictionIO ${BuildInfo.version}.")
+ sys.exit(1)
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala
new file mode 100644
index 0000000..aaafd8a
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala
@@ -0,0 +1,75 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.dashboard
+
+// Reference from: https://gist.github.com/waymost/4b5598523c2c7361abea
+
+import spray.http.{HttpMethods, HttpMethod, HttpResponse, AllOrigins}
+import spray.http.HttpHeaders._
+import spray.http.HttpMethods._
+import spray.http.HttpEntity
+import spray.routing._
+import spray.http.StatusCodes
+import spray.http.ContentTypes
+
+// see also https://developer.mozilla.org/en-US/docs/Web/HTTP/Access_control_CORS
+trait CORSSupport {
+ this: HttpService =>
+
+ private val allowOriginHeader = `Access-Control-Allow-Origin`(AllOrigins)
+ private val optionsCorsHeaders = List(
+ `Access-Control-Allow-Headers`("""Origin,
+ |X-Requested-With,
+ |Content-Type,
+ |Accept,
+ |Accept-Encoding,
+ |Accept-Language,
+ |Host,
+ |Referer,
+ |User-Agent""".stripMargin.replace("\n", " ")),
+ `Access-Control-Max-Age`(1728000)
+ )
+
+ def cors[T]: Directive0 = mapRequestContext { ctx =>
+ ctx.withRouteResponseHandling {
+ // OPTION request for a resource that responds to other methods
+ case Rejected(x) if (ctx.request.method.equals(HttpMethods.OPTIONS) &&
+ x.exists(_.isInstanceOf[MethodRejection])) => {
+ val allowedMethods: List[HttpMethod] = x.collect {
+ case rejection: MethodRejection => rejection.supported
+ }
+ ctx.complete {
+ HttpResponse().withHeaders(
+ `Access-Control-Allow-Methods`(HttpMethods.OPTIONS, allowedMethods :_*) ::
+ allowOriginHeader ::
+ optionsCorsHeaders
+ )
+ }
+ }
+ }.withHttpResponseHeadersMapped { headers =>
+ allowOriginHeader :: headers
+ }
+ }
+
+ override def timeoutRoute: StandardRoute = complete {
+ HttpResponse(
+ StatusCodes.InternalServerError,
+ HttpEntity(ContentTypes.`text/plain(UTF-8)`,
+ "The server was not able to produce a timely response to your request."),
+ List(allowOriginHeader)
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala
new file mode 100644
index 0000000..bfd7c64
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala
@@ -0,0 +1,156 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.dashboard
+
+import com.typesafe.config.ConfigFactory
+import org.apache.predictionio.authentication.KeyAuthentication
+import org.apache.predictionio.configuration.SSLConfiguration
+import org.apache.predictionio.data.storage.Storage
+import spray.can.server.ServerSettings
+import spray.routing.directives.AuthMagnet
+import scala.concurrent.{Future, ExecutionContext}
+import akka.actor.{ActorContext, Actor, ActorSystem, Props}
+import akka.io.IO
+import akka.pattern.ask
+import akka.util.Timeout
+import com.github.nscala_time.time.Imports.DateTime
+import grizzled.slf4j.Logging
+import spray.can.Http
+import spray.http._
+import spray.http.MediaTypes._
+import spray.routing._
+import spray.routing.authentication.{Authentication, UserPass, BasicAuth}
+
+import scala.concurrent.duration._
+
+case class DashboardConfig(
+ ip: String = "localhost",
+ port: Int = 9000)
+
+object Dashboard extends Logging with SSLConfiguration{
+ def main(args: Array[String]): Unit = {
+ val parser = new scopt.OptionParser[DashboardConfig]("Dashboard") {
+ opt[String]("ip") action { (x, c) =>
+ c.copy(ip = x)
+ } text("IP to bind to (default: localhost).")
+ opt[Int]("port") action { (x, c) =>
+ c.copy(port = x)
+ } text("Port to bind to (default: 9000).")
+ }
+
+ parser.parse(args, DashboardConfig()) map { dc =>
+ createDashboard(dc)
+ }
+ }
+
+ def createDashboard(dc: DashboardConfig): Unit = {
+ implicit val system = ActorSystem("pio-dashboard")
+ val service =
+ system.actorOf(Props(classOf[DashboardActor], dc), "dashboard")
+ implicit val timeout = Timeout(5.seconds)
+ val settings = ServerSettings(system)
+ IO(Http) ? Http.Bind(
+ service,
+ interface = dc.ip,
+ port = dc.port,
+ settings = Some(settings.copy(sslEncryption = true)))
+ system.awaitTermination
+ }
+}
+
+class DashboardActor(
+ val dc: DashboardConfig)
+ extends Actor with DashboardService {
+ def actorRefFactory: ActorContext = context
+ def receive: Actor.Receive = runRoute(dashboardRoute)
+}
+
+trait DashboardService extends HttpService with KeyAuthentication with CORSSupport {
+
+ implicit def executionContext: ExecutionContext = actorRefFactory.dispatcher
+ val dc: DashboardConfig
+ val evaluationInstances = Storage.getMetaDataEvaluationInstances
+ val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_"))
+ val serverStartTime = DateTime.now
+ val dashboardRoute =
+ path("") {
+ authenticate(withAccessKeyFromFile) { request =>
+ get {
+ respondWithMediaType(`text/html`) {
+ complete {
+ val completedInstances = evaluationInstances.getCompleted
+ html.index(
+ dc,
+ serverStartTime,
+ pioEnvVars,
+ completedInstances).toString
+ }
+ }
+ }
+ }
+ } ~
+ pathPrefix("engine_instances" / Segment) { instanceId =>
+ path("evaluator_results.txt") {
+ get {
+ respondWithMediaType(`text/plain`) {
+ evaluationInstances.get(instanceId).map { i =>
+ complete(i.evaluatorResults)
+ } getOrElse {
+ complete(StatusCodes.NotFound)
+ }
+ }
+ }
+ } ~
+ path("evaluator_results.html") {
+ get {
+ respondWithMediaType(`text/html`) {
+ evaluationInstances.get(instanceId).map { i =>
+ complete(i.evaluatorResultsHTML)
+ } getOrElse {
+ complete(StatusCodes.NotFound)
+ }
+ }
+ }
+ } ~
+ path("evaluator_results.json") {
+ get {
+ respondWithMediaType(`application/json`) {
+ evaluationInstances.get(instanceId).map { i =>
+ complete(i.evaluatorResultsJSON)
+ } getOrElse {
+ complete(StatusCodes.NotFound)
+ }
+ }
+ }
+ } ~
+ cors {
+ path("local_evaluator_results.json") {
+ get {
+ respondWithMediaType(`application/json`) {
+ evaluationInstances.get(instanceId).map { i =>
+ complete(i.evaluatorResultsJSON)
+ } getOrElse {
+ complete(StatusCodes.NotFound)
+ }
+ }
+ }
+ }
+ }
+ } ~
+ pathPrefix("assets") {
+ getFromResourceDirectory("assets")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
new file mode 100644
index 0000000..feabce4
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
@@ -0,0 +1,104 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.export
+
+import org.apache.predictionio.controller.Utils
+import org.apache.predictionio.data.storage.EventJson4sSupport
+import org.apache.predictionio.data.storage.Storage
+import org.apache.predictionio.tools.Runner
+import org.apache.predictionio.workflow.WorkflowContext
+import org.apache.predictionio.workflow.WorkflowUtils
+
+import grizzled.slf4j.Logging
+import org.apache.spark.sql.SQLContext
+import org.json4s.native.Serialization._
+
+case class EventsToFileArgs(
+ env: String = "",
+ logFile: String = "",
+ appId: Int = 0,
+ channel: Option[String] = None,
+ outputPath: String = "",
+ format: String = "parquet",
+ verbose: Boolean = false,
+ debug: Boolean = false)
+
+object EventsToFile extends Logging {
+ def main(args: Array[String]): Unit = {
+ val parser = new scopt.OptionParser[EventsToFileArgs]("EventsToFile") {
+ opt[String]("env") action { (x, c) =>
+ c.copy(env = x)
+ }
+ opt[String]("log-file") action { (x, c) =>
+ c.copy(logFile = x)
+ }
+ opt[Int]("appid") action { (x, c) =>
+ c.copy(appId = x)
+ }
+ opt[String]("channel") action { (x, c) =>
+ c.copy(channel = Some(x))
+ }
+ opt[String]("format") action { (x, c) =>
+ c.copy(format = x)
+ }
+ opt[String]("output") action { (x, c) =>
+ c.copy(outputPath = x)
+ }
+ opt[Unit]("verbose") action { (x, c) =>
+ c.copy(verbose = true)
+ }
+ opt[Unit]("debug") action { (x, c) =>
+ c.copy(debug = true)
+ }
+ }
+ parser.parse(args, EventsToFileArgs()) map { args =>
+ // get channelId
+ val channels = Storage.getMetaDataChannels
+ val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap
+
+ val channelId: Option[Int] = args.channel.map { ch =>
+ if (!channelMap.contains(ch)) {
+ error(s"Channel ${ch} doesn't exist in this app.")
+ sys.exit(1)
+ }
+
+ channelMap(ch)
+ }
+
+ val channelStr = args.channel.map(n => " Channel " + n).getOrElse("")
+
+ WorkflowUtils.modifyLogging(verbose = args.verbose)
+ @transient lazy implicit val formats = Utils.json4sDefaultFormats +
+ new EventJson4sSupport.APISerializer
+ val sc = WorkflowContext(
+ mode = "Export",
+ batch = "App ID " + args.appId + channelStr,
+ executorEnv = Runner.envStringToMap(args.env))
+ val sqlContext = new SQLContext(sc)
+ val events = Storage.getPEvents()
+ val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc)
+ val jsonStringRdd = eventsRdd.map(write(_))
+ if (args.format == "json") {
+ jsonStringRdd.saveAsTextFile(args.outputPath)
+ } else {
+ val jsonRdd = sqlContext.jsonRDD(jsonStringRdd)
+ jsonRdd.saveAsParquetFile(args.outputPath)
+ }
+ info(s"Events are exported to ${args.outputPath}/.")
+ info("Done.")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala b/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala
new file mode 100644
index 0000000..98a3344
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala
@@ -0,0 +1,103 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.imprt
+
+import org.apache.predictionio.controller.Utils
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventJson4sSupport
+import org.apache.predictionio.data.storage.Storage
+import org.apache.predictionio.tools.Runner
+import org.apache.predictionio.workflow.WorkflowContext
+import org.apache.predictionio.workflow.WorkflowUtils
+
+import grizzled.slf4j.Logging
+import org.json4s.native.Serialization._
+
+import scala.util.{Failure, Try}
+
+case class FileToEventsArgs(
+ env: String = "",
+ logFile: String = "",
+ appId: Int = 0,
+ channel: Option[String] = None,
+ inputPath: String = "",
+ verbose: Boolean = false,
+ debug: Boolean = false)
+
+object FileToEvents extends Logging {
+ def main(args: Array[String]): Unit = {
+ val parser = new scopt.OptionParser[FileToEventsArgs]("FileToEvents") {
+ opt[String]("env") action { (x, c) =>
+ c.copy(env = x)
+ }
+ opt[String]("log-file") action { (x, c) =>
+ c.copy(logFile = x)
+ }
+ opt[Int]("appid") action { (x, c) =>
+ c.copy(appId = x)
+ }
+ opt[String]("channel") action { (x, c) =>
+ c.copy(channel = Some(x))
+ }
+ opt[String]("input") action { (x, c) =>
+ c.copy(inputPath = x)
+ }
+ opt[Unit]("verbose") action { (x, c) =>
+ c.copy(verbose = true)
+ }
+ opt[Unit]("debug") action { (x, c) =>
+ c.copy(debug = true)
+ }
+ }
+ parser.parse(args, FileToEventsArgs()) map { args =>
+ // get channelId
+ val channels = Storage.getMetaDataChannels
+ val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap
+
+ val channelId: Option[Int] = args.channel.map { ch =>
+ if (!channelMap.contains(ch)) {
+ error(s"Channel ${ch} doesn't exist in this app.")
+ sys.exit(1)
+ }
+
+ channelMap(ch)
+ }
+
+ val channelStr = args.channel.map(n => " Channel " + n).getOrElse("")
+
+ WorkflowUtils.modifyLogging(verbose = args.verbose)
+ @transient lazy implicit val formats = Utils.json4sDefaultFormats +
+ new EventJson4sSupport.APISerializer
+ val sc = WorkflowContext(
+ mode = "Import",
+ batch = "App ID " + args.appId + channelStr,
+ executorEnv = Runner.envStringToMap(args.env))
+ val rdd = sc.textFile(args.inputPath).filter(_.trim.nonEmpty).map { json =>
+ Try(read[Event](json)).recoverWith {
+ case e: Throwable =>
+ error(s"\nmalformed json => $json")
+ Failure(e)
+ }.get
+ }
+ val events = Storage.getPEvents()
+ events.write(events = rdd,
+ appId = args.appId,
+ channelId = channelId)(sc)
+ info("Events are imported.")
+ info("Done.")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/twirl/io/prediction/tools/console/accesskey.scala.txt
----------------------------------------------------------------------
diff --git a/tools/src/main/twirl/io/prediction/tools/console/accesskey.scala.txt b/tools/src/main/twirl/io/prediction/tools/console/accesskey.scala.txt
deleted file mode 100644
index 651dbaf..0000000
--- a/tools/src/main/twirl/io/prediction/tools/console/accesskey.scala.txt
+++ /dev/null
@@ -1,20 +0,0 @@
-Usage: pio accesskey new [--key] <app name> [<event1> <event2>...]
-
-Add allowed event(s) to an access key.
-
- --key <value>
- Specify a custom key.
- <app name>
- App to be associated with the new access key.
- <event1> <event2>...
- Allowed event name(s) to be added to the access key.
-
-Usage: pio accesskey list [<app name>]
-
- <app name>
- App name.
-
-Usage: pio accesskey delete <access key>
-
- <access key>
- The access key to be deleted.
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/twirl/io/prediction/tools/console/adminserver.scala.txt
----------------------------------------------------------------------
diff --git a/tools/src/main/twirl/io/prediction/tools/console/adminserver.scala.txt b/tools/src/main/twirl/io/prediction/tools/console/adminserver.scala.txt
deleted file mode 100644
index 4ec0237..0000000
--- a/tools/src/main/twirl/io/prediction/tools/console/adminserver.scala.txt
+++ /dev/null
@@ -1,6 +0,0 @@
-(Experimental Only!) Usage: pio adminserver [--ip <value>] [--port <value>]
-
- --ip <value>
- IP to bind to. Default: localhost
- --port <value>
- Port to bind to. Default: 7071
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/twirl/io/prediction/tools/console/app.scala.txt
----------------------------------------------------------------------
diff --git a/tools/src/main/twirl/io/prediction/tools/console/app.scala.txt b/tools/src/main/twirl/io/prediction/tools/console/app.scala.txt
deleted file mode 100644
index 49f21b1..0000000
--- a/tools/src/main/twirl/io/prediction/tools/console/app.scala.txt
+++ /dev/null
@@ -1,74 +0,0 @@
-Usage: pio app new [--id <value>] [--description <value>] [--access-key <value>]
- <name>
-
-Create a new app key to app ID mapping.
-
- --id <value>
- Specify this if you already have data under an app ID.
- --description <value>
- Description of the new app.
- --access-key <value>
- Specify a custom default access key.
- <name>
- App name.
-
-
-Usage: pio app list
-
-List all apps.
-
-
-Usage: pio app show <name>
-
-Show details of an app.
-
- <name>
- App name.
-
-
-Usage: pio app delete <name> [--force]
-
-Name of the app to be deleted.
-
- <name>
- App name.
- --force, -f
- Delete data without prompting for confirmation
-
-
-Usage: pio app data-delete <name> [--channel <name>] [--all] [--force]
-
-Delete data of an app.
-
- <name>
- App name.
- --channel <name>
- Delete data of the specified channel (default channel if not specified)
- --all
- Delete all data of this app (including both default and all channels)
- --force, -f
- Delete data without prompting for confirmation
-
-
-Usage: pio app channel-new <name> <channel>
-
-Create a new channel for the app.
-
- <name>
- App name.
-
- <channel>
- Channel name to be created.
-
-
-Usage: pio app channel-delete <name> <channel> [--force]
-
-Delete a channel for the app.
-
- <name>
- App name.
-
- <channel>
- Channel name to be deleted.
- --force, -f
- Delete data without prompting for confirmation
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/twirl/io/prediction/tools/console/build.scala.txt
----------------------------------------------------------------------
diff --git a/tools/src/main/twirl/io/prediction/tools/console/build.scala.txt b/tools/src/main/twirl/io/prediction/tools/console/build.scala.txt
deleted file mode 100644
index be80c50..0000000
--- a/tools/src/main/twirl/io/prediction/tools/console/build.scala.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-Usage: pio build [--sbt-extra <value>] [--clean] [--no-asm]
- [common options...]
-
-Build an engine at the current directory.
-
- --sbt-extra <value>
- Extra command to pass to SBT when it builds your engine.
- --clean
- Clean build.
- --no-asm
- Skip building external dependencies assembly.