You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/09/28 02:48:35 UTC
[incubator-openwhisk] branch master updated: Use non-blocking
computation of cache values. (#2797)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 03023a1 Use non-blocking computation of cache values. (#2797)
03023a1 is described below
commit 03023a10b7c9717652330feb0aa73ad1cd38e1bf
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Thu Sep 28 04:48:32 2017 +0200
Use non-blocking computation of cache values. (#2797)
Currently, each time we try to get a value out of our cache `putIfAbsent` is called, which causes thread-contention. Instead, this uses `computeIfAbsent` which does a `get` first and thus is non-blocking for getting cached values under load.
---
.../MultipleReadersSingleWriterCache.scala | 31 +++++++++-------------
1 file changed, 13 insertions(+), 18 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
index 4d45ea0..ef51097 100644
--- a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
+++ b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
@@ -22,20 +22,15 @@
package whisk.core.database
-import java.util.concurrent.ConcurrentMap
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ConcurrentMap, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.Promise
+import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.implicitConversions
import scala.util.Failure
import scala.util.Success
import scala.util.control.NonFatal
-
import com.github.benmanes.caffeine.cache.Caffeine
-
import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionId
@@ -476,18 +471,18 @@ private class ConcurrentMapBackedCache[V](store: ConcurrentMap[Any, Future[V]])
}
def apply(key: Any, genValue: () => Future[V])(implicit ec: ExecutionContext): Future[V] = {
- val promise = Promise[V]()
- store.putIfAbsent(key, promise.future) match {
- case null =>
- val future = genValue()
- future.onComplete { value =>
- promise.complete(value)
- // in case of exceptions we remove the cache entry (i.e. try again later)
- if (value.isFailure) store.remove(key, promise.future)
+ store.computeIfAbsent(
+ key,
+ new java.util.function.Function[Any, Future[V]]() {
+ override def apply(key: Any): Future[V] = {
+ val future = genValue()
+ future.onComplete { value =>
+ // in case of exceptions we remove the cache entry (i.e. try again later)
+ if (value.isFailure) store.remove(key, future)
+ }
+ future
}
- future
- case existingFuture => existingFuture
- }
+ })
}
def remove(key: Any) = Option(store.remove(key))
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].