You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/11/13 16:03:16 UTC
[GitHub] chetanmeh closed pull request #4109: update mesos-actor;
cleanup orphaned failed task launches
chetanmeh closed pull request #4109: update mesos-actor; cleanup orphaned failed task launches
URL: https://github.com/apache/incubator-openwhisk/pull/4109
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 2ae4a2963d..500e3227c7 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -64,7 +64,7 @@ dependencies {
compile 'io.kamon:kamon-core_2.12:0.6.7'
compile 'io.kamon:kamon-statsd_2.12:0.6.7'
//for mesos
- compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.13'
+ compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.14'
//tracing support
compile 'io.opentracing:opentracing-api:0.31.0'
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
index b78e33fc49..373b123f97 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
@@ -37,8 +37,6 @@ import com.adobe.api.platform.runtime.mesos.SubmitTask
import com.adobe.api.platform.runtime.mesos.TaskDef
import com.adobe.api.platform.runtime.mesos.User
import java.time.Instant
-import org.apache.mesos.v1.Protos.TaskState
-import org.apache.mesos.v1.Protos.TaskStatus
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
@@ -142,9 +140,14 @@ object MesosTask {
transid.finished(this, start, s"launched task ${taskId} at ${taskDetails.hostname}:${taskDetails
.hostports(0)}", logLevel = InfoLevel)
case Failure(ate: AskTimeoutException) =>
- transid.failed(this, start, ate.getMessage, ErrorLevel)
+ transid.failed(this, start, s"task launch timed out ${ate.getMessage}", ErrorLevel)
MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(LAUNCH_CMD))
- case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
+ //kill the task whose launch timed out
+ destroy(mesosClientActor, mesosConfig, taskId)
+ case Failure(t) =>
+ //kill the task whose launch timed out
+ destroy(mesosClientActor, mesosConfig, taskId)
+ transid.failed(this, start, s"task launch failed ${t.getMessage}", ErrorLevel)
}
.map(taskDetails => {
val taskHost = taskDetails.hostname
@@ -155,7 +158,29 @@ object MesosTask {
})
}
+ private def destroy(mesosClientActor: ActorRef, mesosConfig: MesosConfig, taskId: String)(
+ implicit transid: TransactionId,
+ logging: Logging,
+ ec: ExecutionContext): Future[Unit] = {
+ val taskDeleteTimeout = Timeout(mesosConfig.timeouts.taskDelete)
+ val start = transid.started(
+ this,
+ LoggingMarkers.INVOKER_MESOS_CMD(MesosTask.KILL_CMD),
+ s"killing mesos taskid $taskId (timeout: ${taskDeleteTimeout})",
+ logLevel = InfoLevel)
+
+ mesosClientActor
+ .ask(DeleteTask(taskId))(taskDeleteTimeout)
+ .andThen {
+ case Success(_) => transid.finished(this, start, logLevel = InfoLevel)
+ case Failure(ate: AskTimeoutException) =>
+ transid.failed(this, start, s"task destroy timed out ${ate.getMessage}", ErrorLevel)
+ MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(MesosTask.KILL_CMD))
+ case Failure(t) => transid.failed(this, start, s"task destroy failed ${t.getMessage}", ErrorLevel)
+ }
+ .map(_ => {})
+ }
}
object JsonFormatters extends DefaultJsonProtocol {
@@ -171,7 +196,6 @@ class MesosTask(override protected val id: ContainerId,
mesosClientActor: ActorRef,
mesosConfig: MesosConfig)
extends Container {
- val taskDeleteTimeout = Timeout(mesosConfig.timeouts.taskLaunch)
/** Stops the container from consuming CPU cycles. */
override def suspend()(implicit transid: TransactionId): Future[Unit] = {
@@ -187,30 +211,7 @@ class MesosTask(override protected val id: ContainerId,
/** Completely destroys this instance of the container. */
override def destroy()(implicit transid: TransactionId): Future[Unit] = {
- val start = transid.started(
- this,
- LoggingMarkers.INVOKER_MESOS_CMD(MesosTask.KILL_CMD),
- s"killing mesos taskid $taskId (timeout: ${taskDeleteTimeout})",
- logLevel = InfoLevel)
-
- mesosClientActor
- .ask(DeleteTask(taskId))(taskDeleteTimeout)
- .mapTo[TaskStatus]
- .andThen {
- case Success(_) => transid.finished(this, start, logLevel = InfoLevel)
- case Failure(ate: AskTimeoutException) =>
- transid.failed(this, start, ate.getMessage, ErrorLevel)
- MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(MesosTask.KILL_CMD))
- case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
- }
- .map(taskStatus => {
- // verify that task ended in TASK_KILLED state (but don't fail if it didn't...)
- if (taskStatus.getState != TaskState.TASK_KILLED) {
- logging.error(this, s"task kill resulted in unexpected state ${taskStatus.getState}")
- } else {
- logging.info(this, s"task killed ended with state ${taskStatus.getState}")
- }
- })(ec)
+ MesosTask.destroy(mesosClientActor, mesosConfig, taskId)
}
/**
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services