You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by ma...@apache.org on 2017/09/12 00:13:38 UTC
incubator-predictionio git commit: [PIO-120] ensures the train
process exits gracefully after an Elasticsearch connection error
Repository: incubator-predictionio
Updated Branches:
refs/heads/develop a0a2c1261 -> 345183658
[PIO-120] ensures the train process exits gracefully after an Elasticsearch connection error
Closes #432
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/34518365
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/34518365
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/34518365
Branch: refs/heads/develop
Commit: 3451836584e16f324dc90f973846618e6a5bc3c9
Parents: a0a2c12
Author: Mars Hall <ma...@heroku.com>
Authored: Mon Sep 11 17:13:26 2017 -0700
Committer: Mars Hall <ma...@heroku.com>
Committed: Mon Sep 11 17:13:26 2017 -0700
----------------------------------------------------------------------
.../predictionio/workflow/CreateWorkflow.scala | 238 ++++++++++---------
1 file changed, 121 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/34518365/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
index 303ed06..c5f32a6 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
@@ -134,144 +134,148 @@ object CreateWorkflow extends Logging {
}
def main(args: Array[String]): Unit = {
- val wfcOpt = parser.parse(args, WorkflowConfig())
- if (wfcOpt.isEmpty) {
- logger.error("WorkflowConfig is empty. Quitting")
- return
- }
+ try {
+ val wfcOpt = parser.parse(args, WorkflowConfig())
+ if (wfcOpt.isEmpty) {
+ logger.error("WorkflowConfig is empty. Quitting")
+ return
+ }
- val wfc = wfcOpt.get
+ val wfc = wfcOpt.get
- WorkflowUtils.modifyLogging(wfc.verbose)
+ WorkflowUtils.modifyLogging(wfc.verbose)
- val evaluation = wfc.evaluationClass.map { ec =>
- try {
- WorkflowUtils.getEvaluation(ec, getClass.getClassLoader)._2
- } catch {
- case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
- error(s"Unable to obtain evaluation $ec. Aborting workflow.", e)
- sys.exit(1)
+ val evaluation = wfc.evaluationClass.map { ec =>
+ try {
+ WorkflowUtils.getEvaluation(ec, getClass.getClassLoader)._2
+ } catch {
+ case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
+ error(s"Unable to obtain evaluation $ec. Aborting workflow.", e)
+ sys.exit(1)
+ }
}
- }
- val engineParamsGenerator = wfc.engineParamsGeneratorClass.map { epg =>
- try {
- WorkflowUtils.getEngineParamsGenerator(epg, getClass.getClassLoader)._2
- } catch {
- case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
- error(s"Unable to obtain engine parameters generator $epg. " +
- "Aborting workflow.", e)
- sys.exit(1)
+ val engineParamsGenerator = wfc.engineParamsGeneratorClass.map { epg =>
+ try {
+ WorkflowUtils.getEngineParamsGenerator(epg, getClass.getClassLoader)._2
+ } catch {
+ case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
+ error(s"Unable to obtain engine parameters generator $epg. " +
+ "Aborting workflow.", e)
+ sys.exit(1)
+ }
}
- }
- val pioEnvVars = wfc.env.map { e =>
- e.split(',').flatMap { p =>
- p.split('=') match {
- case Array(k, v) => List(k -> v)
- case _ => Nil
- }
- }.toMap
- }.getOrElse(Map.empty)
+ val pioEnvVars = wfc.env.map { e =>
+ e.split(',').flatMap { p =>
+ p.split('=') match {
+ case Array(k, v) => List(k -> v)
+ case _ => Nil
+ }
+ }.toMap
+ }.getOrElse(Map.empty)
- if (evaluation.isEmpty) {
- val variantJson = parse(stringFromFile(wfc.engineVariant))
- val engineFactory = if (wfc.engineFactory == "") {
- variantJson \ "engineFactory" match {
+ if (evaluation.isEmpty) {
+ val variantJson = parse(stringFromFile(wfc.engineVariant))
+ val engineFactory = if (wfc.engineFactory == "") {
+ variantJson \ "engineFactory" match {
+ case JString(s) => s
+ case _ =>
+ error("Unable to read engine factory class name from " +
+ s"${wfc.engineVariant}. Aborting.")
+ sys.exit(1)
+ }
+ } else wfc.engineFactory
+ val variantId = variantJson \ "id" match {
case JString(s) => s
case _ =>
- error("Unable to read engine factory class name from " +
+ error("Unable to read engine variant ID from " +
s"${wfc.engineVariant}. Aborting.")
sys.exit(1)
}
- } else wfc.engineFactory
- val variantId = variantJson \ "id" match {
- case JString(s) => s
- case _ =>
- error("Unable to read engine variant ID from " +
- s"${wfc.engineVariant}. Aborting.")
- sys.exit(1)
- }
- val (engineLanguage, engineFactoryObj) = try {
- WorkflowUtils.getEngine(engineFactory, getClass.getClassLoader)
- } catch {
- case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
- error(s"Unable to obtain engine: ${e.getMessage}. Aborting workflow.")
- sys.exit(1)
- }
+ val (engineLanguage, engineFactoryObj) = try {
+ WorkflowUtils.getEngine(engineFactory, getClass.getClassLoader)
+ } catch {
+ case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
+ error(s"Unable to obtain engine: ${e.getMessage}. Aborting workflow.")
+ sys.exit(1)
+ }
- val engine: BaseEngine[_, _, _, _] = engineFactoryObj()
+ val engine: BaseEngine[_, _, _, _] = engineFactoryObj()
- val customSparkConf = WorkflowUtils.extractSparkConf(variantJson)
- val workflowParams = WorkflowParams(
- verbose = wfc.verbosity,
- skipSanityCheck = wfc.skipSanityCheck,
- stopAfterRead = wfc.stopAfterRead,
- stopAfterPrepare = wfc.stopAfterPrepare,
- sparkEnv = WorkflowParams().sparkEnv ++ customSparkConf)
+ val customSparkConf = WorkflowUtils.extractSparkConf(variantJson)
+ val workflowParams = WorkflowParams(
+ verbose = wfc.verbosity,
+ skipSanityCheck = wfc.skipSanityCheck,
+ stopAfterRead = wfc.stopAfterRead,
+ stopAfterPrepare = wfc.stopAfterPrepare,
+ sparkEnv = WorkflowParams().sparkEnv ++ customSparkConf)
- // Evaluator Not Specified. Do training.
- if (!engine.isInstanceOf[Engine[_,_,_,_,_,_]]) {
- throw new NoSuchMethodException(s"Engine $engine is not trainable")
- }
+ // Evaluator Not Specified. Do training.
+ if (!engine.isInstanceOf[Engine[_,_,_,_,_,_]]) {
+ throw new NoSuchMethodException(s"Engine $engine is not trainable")
+ }
- val trainableEngine = engine.asInstanceOf[Engine[_, _, _, _, _, _]]
+ val trainableEngine = engine.asInstanceOf[Engine[_, _, _, _, _, _]]
- val engineParams = if (wfc.engineParamsKey == "") {
- trainableEngine.jValueToEngineParams(variantJson, wfc.jsonExtractor)
- } else {
- engineFactoryObj.engineParams(wfc.engineParamsKey)
- }
+ val engineParams = if (wfc.engineParamsKey == "") {
+ trainableEngine.jValueToEngineParams(variantJson, wfc.jsonExtractor)
+ } else {
+ engineFactoryObj.engineParams(wfc.engineParamsKey)
+ }
- val engineInstance = EngineInstance(
- id = "",
- status = "INIT",
- startTime = DateTime.now,
- endTime = DateTime.now,
- engineId = wfc.engineId,
- engineVersion = wfc.engineVersion,
- engineVariant = variantId,
- engineFactory = engineFactory,
- batch = wfc.batch,
- env = pioEnvVars,
- sparkConf = workflowParams.sparkEnv,
- dataSourceParams =
- JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams),
- preparatorParams =
- JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams),
- algorithmsParams =
- JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList),
- servingParams =
- JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams))
+ val engineInstance = EngineInstance(
+ id = "",
+ status = "INIT",
+ startTime = DateTime.now,
+ endTime = DateTime.now,
+ engineId = wfc.engineId,
+ engineVersion = wfc.engineVersion,
+ engineVariant = variantId,
+ engineFactory = engineFactory,
+ batch = wfc.batch,
+ env = pioEnvVars,
+ sparkConf = workflowParams.sparkEnv,
+ dataSourceParams =
+ JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams),
+ preparatorParams =
+ JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams),
+ algorithmsParams =
+ JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList),
+ servingParams =
+ JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams))
- val engineInstanceId = Storage.getMetaDataEngineInstances.insert(
- engineInstance)
+ val engineInstanceId = Storage.getMetaDataEngineInstances.insert(
+ engineInstance)
- CoreWorkflow.runTrain(
- env = pioEnvVars,
- params = workflowParams,
- engine = trainableEngine,
- engineParams = engineParams,
- engineInstance = engineInstance.copy(id = engineInstanceId))
- } else {
- val workflowParams = WorkflowParams(
- verbose = wfc.verbosity,
- skipSanityCheck = wfc.skipSanityCheck,
- stopAfterRead = wfc.stopAfterRead,
- stopAfterPrepare = wfc.stopAfterPrepare,
- sparkEnv = WorkflowParams().sparkEnv)
- val evaluationInstance = EvaluationInstance(
- evaluationClass = wfc.evaluationClass.get,
- engineParamsGeneratorClass = wfc.engineParamsGeneratorClass.get,
- batch = wfc.batch,
- env = pioEnvVars,
- sparkConf = workflowParams.sparkEnv
- )
- Workflow.runEvaluation(
- evaluation = evaluation.get,
- engineParamsGenerator = engineParamsGenerator.get,
- evaluationInstance = evaluationInstance,
- params = workflowParams)
+ CoreWorkflow.runTrain(
+ env = pioEnvVars,
+ params = workflowParams,
+ engine = trainableEngine,
+ engineParams = engineParams,
+ engineInstance = engineInstance.copy(id = engineInstanceId))
+ } else {
+ val workflowParams = WorkflowParams(
+ verbose = wfc.verbosity,
+ skipSanityCheck = wfc.skipSanityCheck,
+ stopAfterRead = wfc.stopAfterRead,
+ stopAfterPrepare = wfc.stopAfterPrepare,
+ sparkEnv = WorkflowParams().sparkEnv)
+ val evaluationInstance = EvaluationInstance(
+ evaluationClass = wfc.evaluationClass.get,
+ engineParamsGeneratorClass = wfc.engineParamsGeneratorClass.get,
+ batch = wfc.batch,
+ env = pioEnvVars,
+ sparkConf = workflowParams.sparkEnv
+ )
+ Workflow.runEvaluation(
+ evaluation = evaluation.get,
+ engineParamsGenerator = engineParamsGenerator.get,
+ evaluationInstance = evaluationInstance,
+ params = workflowParams)
+ }
+ } finally {
+ CleanupFunctions.run()
}
}
}