You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by ch...@apache.org on 2017/09/28 15:55:31 UTC

[50/57] [abbrv] incubator-predictionio git commit: [PIO-120] ensures the train process exits gracefully after an Elasticsearch connection error

[PIO-120] ensures the train process exits gracefully after an Elasticsearch connection error

Closes #432


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/34518365
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/34518365
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/34518365

Branch: refs/heads/livedoc
Commit: 3451836584e16f324dc90f973846618e6a5bc3c9
Parents: a0a2c12
Author: Mars Hall <ma...@heroku.com>
Authored: Mon Sep 11 17:13:26 2017 -0700
Committer: Mars Hall <ma...@heroku.com>
Committed: Mon Sep 11 17:13:26 2017 -0700

----------------------------------------------------------------------
 .../predictionio/workflow/CreateWorkflow.scala  | 238 ++++++++++---------
 1 file changed, 121 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/34518365/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
index 303ed06..c5f32a6 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
@@ -134,144 +134,148 @@ object CreateWorkflow extends Logging {
   }
 
   def main(args: Array[String]): Unit = {
-    val wfcOpt = parser.parse(args, WorkflowConfig())
-    if (wfcOpt.isEmpty) {
-      logger.error("WorkflowConfig is empty. Quitting")
-      return
-    }
+    try {
+      val wfcOpt = parser.parse(args, WorkflowConfig())
+      if (wfcOpt.isEmpty) {
+        logger.error("WorkflowConfig is empty. Quitting")
+        return
+      }
 
-    val wfc = wfcOpt.get
+      val wfc = wfcOpt.get
 
-    WorkflowUtils.modifyLogging(wfc.verbose)
+      WorkflowUtils.modifyLogging(wfc.verbose)
 
-    val evaluation = wfc.evaluationClass.map { ec =>
-      try {
-        WorkflowUtils.getEvaluation(ec, getClass.getClassLoader)._2
-      } catch {
-        case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
-          error(s"Unable to obtain evaluation $ec. Aborting workflow.", e)
-          sys.exit(1)
+      val evaluation = wfc.evaluationClass.map { ec =>
+        try {
+          WorkflowUtils.getEvaluation(ec, getClass.getClassLoader)._2
+        } catch {
+          case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
+            error(s"Unable to obtain evaluation $ec. Aborting workflow.", e)
+            sys.exit(1)
+        }
       }
-    }
 
-    val engineParamsGenerator = wfc.engineParamsGeneratorClass.map { epg =>
-      try {
-        WorkflowUtils.getEngineParamsGenerator(epg, getClass.getClassLoader)._2
-      } catch {
-        case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
-          error(s"Unable to obtain engine parameters generator $epg. " +
-            "Aborting workflow.", e)
-          sys.exit(1)
+      val engineParamsGenerator = wfc.engineParamsGeneratorClass.map { epg =>
+        try {
+          WorkflowUtils.getEngineParamsGenerator(epg, getClass.getClassLoader)._2
+        } catch {
+          case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
+            error(s"Unable to obtain engine parameters generator $epg. " +
+              "Aborting workflow.", e)
+            sys.exit(1)
+        }
       }
-    }
 
-    val pioEnvVars = wfc.env.map { e =>
-      e.split(',').flatMap { p =>
-        p.split('=') match {
-          case Array(k, v) => List(k -> v)
-          case _ => Nil
-        }
-      }.toMap
-    }.getOrElse(Map.empty)
+      val pioEnvVars = wfc.env.map { e =>
+        e.split(',').flatMap { p =>
+          p.split('=') match {
+            case Array(k, v) => List(k -> v)
+            case _ => Nil
+          }
+        }.toMap
+      }.getOrElse(Map.empty)
 
-    if (evaluation.isEmpty) {
-      val variantJson = parse(stringFromFile(wfc.engineVariant))
-      val engineFactory = if (wfc.engineFactory == "") {
-        variantJson \ "engineFactory" match {
+      if (evaluation.isEmpty) {
+        val variantJson = parse(stringFromFile(wfc.engineVariant))
+        val engineFactory = if (wfc.engineFactory == "") {
+          variantJson \ "engineFactory" match {
+            case JString(s) => s
+            case _ =>
+              error("Unable to read engine factory class name from " +
+                s"${wfc.engineVariant}. Aborting.")
+              sys.exit(1)
+          }
+        } else wfc.engineFactory
+        val variantId = variantJson \ "id" match {
           case JString(s) => s
           case _ =>
-            error("Unable to read engine factory class name from " +
+            error("Unable to read engine variant ID from " +
               s"${wfc.engineVariant}. Aborting.")
             sys.exit(1)
         }
-      } else wfc.engineFactory
-      val variantId = variantJson \ "id" match {
-        case JString(s) => s
-        case _ =>
-          error("Unable to read engine variant ID from " +
-            s"${wfc.engineVariant}. Aborting.")
-          sys.exit(1)
-      }
-      val (engineLanguage, engineFactoryObj) = try {
-        WorkflowUtils.getEngine(engineFactory, getClass.getClassLoader)
-      } catch {
-        case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
-          error(s"Unable to obtain engine: ${e.getMessage}. Aborting workflow.")
-          sys.exit(1)
-      }
+        val (engineLanguage, engineFactoryObj) = try {
+          WorkflowUtils.getEngine(engineFactory, getClass.getClassLoader)
+        } catch {
+          case e @ (_: ClassNotFoundException | _: NoSuchMethodException) =>
+            error(s"Unable to obtain engine: ${e.getMessage}. Aborting workflow.")
+            sys.exit(1)
+        }
 
-      val engine: BaseEngine[_, _, _, _] = engineFactoryObj()
+        val engine: BaseEngine[_, _, _, _] = engineFactoryObj()
 
-      val customSparkConf = WorkflowUtils.extractSparkConf(variantJson)
-      val workflowParams = WorkflowParams(
-        verbose = wfc.verbosity,
-        skipSanityCheck = wfc.skipSanityCheck,
-        stopAfterRead = wfc.stopAfterRead,
-        stopAfterPrepare = wfc.stopAfterPrepare,
-        sparkEnv = WorkflowParams().sparkEnv ++ customSparkConf)
+        val customSparkConf = WorkflowUtils.extractSparkConf(variantJson)
+        val workflowParams = WorkflowParams(
+          verbose = wfc.verbosity,
+          skipSanityCheck = wfc.skipSanityCheck,
+          stopAfterRead = wfc.stopAfterRead,
+          stopAfterPrepare = wfc.stopAfterPrepare,
+          sparkEnv = WorkflowParams().sparkEnv ++ customSparkConf)
 
-      // Evaluator Not Specified. Do training.
-      if (!engine.isInstanceOf[Engine[_,_,_,_,_,_]]) {
-        throw new NoSuchMethodException(s"Engine $engine is not trainable")
-      }
+        // Evaluator Not Specified. Do training.
+        if (!engine.isInstanceOf[Engine[_,_,_,_,_,_]]) {
+          throw new NoSuchMethodException(s"Engine $engine is not trainable")
+        }
 
-      val trainableEngine = engine.asInstanceOf[Engine[_, _, _, _, _, _]]
+        val trainableEngine = engine.asInstanceOf[Engine[_, _, _, _, _, _]]
 
-      val engineParams = if (wfc.engineParamsKey == "") {
-        trainableEngine.jValueToEngineParams(variantJson, wfc.jsonExtractor)
-      } else {
-        engineFactoryObj.engineParams(wfc.engineParamsKey)
-      }
+        val engineParams = if (wfc.engineParamsKey == "") {
+          trainableEngine.jValueToEngineParams(variantJson, wfc.jsonExtractor)
+        } else {
+          engineFactoryObj.engineParams(wfc.engineParamsKey)
+        }
 
-      val engineInstance = EngineInstance(
-        id = "",
-        status = "INIT",
-        startTime = DateTime.now,
-        endTime = DateTime.now,
-        engineId = wfc.engineId,
-        engineVersion = wfc.engineVersion,
-        engineVariant = variantId,
-        engineFactory = engineFactory,
-        batch = wfc.batch,
-        env = pioEnvVars,
-        sparkConf = workflowParams.sparkEnv,
-        dataSourceParams =
-          JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams),
-        preparatorParams =
-          JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams),
-        algorithmsParams =
-          JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList),
-        servingParams =
-          JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams))
+        val engineInstance = EngineInstance(
+          id = "",
+          status = "INIT",
+          startTime = DateTime.now,
+          endTime = DateTime.now,
+          engineId = wfc.engineId,
+          engineVersion = wfc.engineVersion,
+          engineVariant = variantId,
+          engineFactory = engineFactory,
+          batch = wfc.batch,
+          env = pioEnvVars,
+          sparkConf = workflowParams.sparkEnv,
+          dataSourceParams =
+            JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams),
+          preparatorParams =
+            JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams),
+          algorithmsParams =
+            JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList),
+          servingParams =
+            JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams))
 
-      val engineInstanceId = Storage.getMetaDataEngineInstances.insert(
-        engineInstance)
+        val engineInstanceId = Storage.getMetaDataEngineInstances.insert(
+          engineInstance)
 
-      CoreWorkflow.runTrain(
-        env = pioEnvVars,
-        params = workflowParams,
-        engine = trainableEngine,
-        engineParams = engineParams,
-        engineInstance = engineInstance.copy(id = engineInstanceId))
-    } else {
-      val workflowParams = WorkflowParams(
-        verbose = wfc.verbosity,
-        skipSanityCheck = wfc.skipSanityCheck,
-        stopAfterRead = wfc.stopAfterRead,
-        stopAfterPrepare = wfc.stopAfterPrepare,
-        sparkEnv = WorkflowParams().sparkEnv)
-      val evaluationInstance = EvaluationInstance(
-        evaluationClass = wfc.evaluationClass.get,
-        engineParamsGeneratorClass = wfc.engineParamsGeneratorClass.get,
-        batch = wfc.batch,
-        env = pioEnvVars,
-        sparkConf = workflowParams.sparkEnv
-      )
-      Workflow.runEvaluation(
-        evaluation = evaluation.get,
-        engineParamsGenerator = engineParamsGenerator.get,
-        evaluationInstance = evaluationInstance,
-        params = workflowParams)
+        CoreWorkflow.runTrain(
+          env = pioEnvVars,
+          params = workflowParams,
+          engine = trainableEngine,
+          engineParams = engineParams,
+          engineInstance = engineInstance.copy(id = engineInstanceId))
+      } else {
+        val workflowParams = WorkflowParams(
+          verbose = wfc.verbosity,
+          skipSanityCheck = wfc.skipSanityCheck,
+          stopAfterRead = wfc.stopAfterRead,
+          stopAfterPrepare = wfc.stopAfterPrepare,
+          sparkEnv = WorkflowParams().sparkEnv)
+        val evaluationInstance = EvaluationInstance(
+          evaluationClass = wfc.evaluationClass.get,
+          engineParamsGeneratorClass = wfc.engineParamsGeneratorClass.get,
+          batch = wfc.batch,
+          env = pioEnvVars,
+          sparkConf = workflowParams.sparkEnv
+        )
+        Workflow.runEvaluation(
+          evaluation = evaluation.get,
+          engineParamsGenerator = engineParamsGenerator.get,
+          evaluationInstance = evaluationInstance,
+          params = workflowParams)
+      }
+    } finally {
+      CleanupFunctions.run()
     }
   }
 }