You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/01/02 22:40:30 UTC
[incubator-openwhisk] branch master updated: fix memory drag in
PrimitiveActions, and clean up withAlternateAfterTimeout (#3129)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 1293647 fix memory drag in PrimitiveActions, and clean up withAlternateAfterTimeout (#3129)
1293647 is described below
commit 1293647f96829df4c0ea90e815dc2b6fc664de11
Author: Nick Mitchell <st...@users.noreply.github.com>
AuthorDate: Tue Jan 2 17:40:27 2018 -0500
fix memory drag in PrimitiveActions, and clean up withAlternateAfterTimeout (#3129)
---
.../whisk/utils/ExecutionContextFactory.scala | 51 ++++++++++++++++------
1 file changed, 38 insertions(+), 13 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
index 88764d3..ea025cb 100644
--- a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
+++ b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala
@@ -23,38 +23,63 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration
-import scala.util.Try
+import scala.util.control.NonFatal
import akka.actor.ActorSystem
-import akka.pattern.{after => expire}
+import akka.actor.Cancellable
+import akka.actor.Scheduler
object ExecutionContextFactory {
- // Future.firstCompletedOf has a memory drag bug
- // https://stackoverflow.com/questions/36420697/about-future-firstcompletedof-and-garbage-collect-mechanism
- def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
+ private type CancellableFuture[T] = (Cancellable, Future[T])
+
+ /**
+ * akka.pattern.after has a memory drag issue: it opaquely
+ * schedules an actor which consequently results in drag for the
+ * timeout duration
+ *
+ */
+ def expire[T](duration: FiniteDuration, using: Scheduler)(value: ⇒ Future[T])(
+ implicit ec: ExecutionContext): CancellableFuture[T] = {
val p = Promise[T]()
- val pref = new java.util.concurrent.atomic.AtomicReference(p)
- val completeFirst: Try[T] => Unit = { result: Try[T] =>
- val promise = pref.getAndSet(null)
- if (promise != null) {
- promise.tryComplete(result)
+ val cancellable = using.scheduleOnce(duration) {
+ p completeWith {
+ try value
+ catch { case NonFatal(t) ⇒ Future.failed(t) }
}
}
- futures foreach { _ onComplete completeFirst }
+ (cancellable, p.future)
+ }
+
+ /**
+ * Return the first of the two given futures to complete; if f1
+ * finishes first, we will cancel f2
+ *
+ */
+ def firstCompletedOf2[T](f1: Future[T], f2Cancellable: CancellableFuture[T])(
+ implicit executor: ExecutionContext): Future[T] = {
+ val p = Promise[T]()
+ val (f2Killswitch, f2) = f2Cancellable
+
+ f1.onComplete { result =>
+ p.tryComplete(result)
+ f2Killswitch.cancel()
+ }
+ f2.onComplete(p.tryComplete)
+
p.future
}
implicit class FutureExtensions[T](f: Future[T]) {
def withTimeout(timeout: FiniteDuration, msg: => Throwable)(implicit system: ActorSystem): Future[T] = {
implicit val ec = system.dispatcher
- firstCompletedOf(Seq(f, expire(timeout, system.scheduler)(Future.failed(msg))))
+ firstCompletedOf2(f, expire(timeout, system.scheduler)(Future.failed(msg)))
}
def withAlternativeAfterTimeout(timeout: FiniteDuration, alt: => Future[T])(
implicit system: ActorSystem): Future[T] = {
implicit val ec = system.dispatcher
- firstCompletedOf(Seq(f, expire(timeout, system.scheduler)(alt)))
+ firstCompletedOf2(f, expire(timeout, system.scheduler)(alt))
}
}
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].