You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/11/13 16:03:16 UTC

[GitHub] chetanmeh closed pull request #4109: update mesos-actor; cleanup orphaned failed task launches

chetanmeh closed pull request #4109: update mesos-actor; cleanup orphaned failed task launches
URL: https://github.com/apache/incubator-openwhisk/pull/4109
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 2ae4a2963d..500e3227c7 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -64,7 +64,7 @@ dependencies {
     compile 'io.kamon:kamon-core_2.12:0.6.7'
     compile 'io.kamon:kamon-statsd_2.12:0.6.7'
     //for mesos
-    compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.13'
+    compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.14'
 
     //tracing support
     compile 'io.opentracing:opentracing-api:0.31.0'
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
index b78e33fc49..373b123f97 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
@@ -37,8 +37,6 @@ import com.adobe.api.platform.runtime.mesos.SubmitTask
 import com.adobe.api.platform.runtime.mesos.TaskDef
 import com.adobe.api.platform.runtime.mesos.User
 import java.time.Instant
-import org.apache.mesos.v1.Protos.TaskState
-import org.apache.mesos.v1.Protos.TaskStatus
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.util.Failure
@@ -142,9 +140,14 @@ object MesosTask {
           transid.finished(this, start, s"launched task ${taskId} at ${taskDetails.hostname}:${taskDetails
             .hostports(0)}", logLevel = InfoLevel)
         case Failure(ate: AskTimeoutException) =>
-          transid.failed(this, start, ate.getMessage, ErrorLevel)
+          transid.failed(this, start, s"task launch timed out ${ate.getMessage}", ErrorLevel)
           MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(LAUNCH_CMD))
-        case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
+          //kill the task whose launch timed out
+          destroy(mesosClientActor, mesosConfig, taskId)
+        case Failure(t) =>
+          //kill the task whose launch timed out
+          destroy(mesosClientActor, mesosConfig, taskId)
+          transid.failed(this, start, s"task launch failed ${t.getMessage}", ErrorLevel)
       }
       .map(taskDetails => {
         val taskHost = taskDetails.hostname
@@ -155,7 +158,29 @@ object MesosTask {
       })
 
   }
+  private def destroy(mesosClientActor: ActorRef, mesosConfig: MesosConfig, taskId: String)(
+    implicit transid: TransactionId,
+    logging: Logging,
+    ec: ExecutionContext): Future[Unit] = {
+    val taskDeleteTimeout = Timeout(mesosConfig.timeouts.taskDelete)
 
+    val start = transid.started(
+      this,
+      LoggingMarkers.INVOKER_MESOS_CMD(MesosTask.KILL_CMD),
+      s"killing mesos taskid $taskId (timeout: ${taskDeleteTimeout})",
+      logLevel = InfoLevel)
+
+    mesosClientActor
+      .ask(DeleteTask(taskId))(taskDeleteTimeout)
+      .andThen {
+        case Success(_) => transid.finished(this, start, logLevel = InfoLevel)
+        case Failure(ate: AskTimeoutException) =>
+          transid.failed(this, start, s"task destroy timed out ${ate.getMessage}", ErrorLevel)
+          MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(MesosTask.KILL_CMD))
+        case Failure(t) => transid.failed(this, start, s"task destroy failed ${t.getMessage}", ErrorLevel)
+      }
+      .map(_ => {})
+  }
 }
 
 object JsonFormatters extends DefaultJsonProtocol {
@@ -171,7 +196,6 @@ class MesosTask(override protected val id: ContainerId,
                 mesosClientActor: ActorRef,
                 mesosConfig: MesosConfig)
     extends Container {
-  val taskDeleteTimeout = Timeout(mesosConfig.timeouts.taskLaunch)
 
   /** Stops the container from consuming CPU cycles. */
   override def suspend()(implicit transid: TransactionId): Future[Unit] = {
@@ -187,30 +211,7 @@ class MesosTask(override protected val id: ContainerId,
 
   /** Completely destroys this instance of the container. */
   override def destroy()(implicit transid: TransactionId): Future[Unit] = {
-    val start = transid.started(
-      this,
-      LoggingMarkers.INVOKER_MESOS_CMD(MesosTask.KILL_CMD),
-      s"killing mesos taskid $taskId (timeout: ${taskDeleteTimeout})",
-      logLevel = InfoLevel)
-
-    mesosClientActor
-      .ask(DeleteTask(taskId))(taskDeleteTimeout)
-      .mapTo[TaskStatus]
-      .andThen {
-        case Success(_) => transid.finished(this, start, logLevel = InfoLevel)
-        case Failure(ate: AskTimeoutException) =>
-          transid.failed(this, start, ate.getMessage, ErrorLevel)
-          MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(MesosTask.KILL_CMD))
-        case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
-      }
-      .map(taskStatus => {
-        // verify that task ended in TASK_KILLED state (but don't fail if it didn't...)
-        if (taskStatus.getState != TaskState.TASK_KILLED) {
-          logging.error(this, s"task kill resulted in unexpected state ${taskStatus.getState}")
-        } else {
-          logging.info(this, s"task killed ended with state ${taskStatus.getState}")
-        }
-      })(ec)
+    MesosTask.destroy(mesosClientActor, mesosConfig, taskId)
   }
 
   /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services