You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/09/28 02:48:35 UTC

[incubator-openwhisk] branch master updated: Use non-blocking computation of cache values. (#2797)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 03023a1  Use non-blocking computation of cache values. (#2797)
03023a1 is described below

commit 03023a10b7c9717652330feb0aa73ad1cd38e1bf
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Thu Sep 28 04:48:32 2017 +0200

    Use non-blocking computation of cache values. (#2797)
    
    Currently, each time we try to get a value out of our cache `putIfAbsent` is called, which causes thread-contention. Instead, this uses `computeIfAbsent` which does a `get` first and thus is non-blocking for getting cached values under load.
---
 .../MultipleReadersSingleWriterCache.scala         | 31 +++++++++-------------
 1 file changed, 13 insertions(+), 18 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
index 4d45ea0..ef51097 100644
--- a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
+++ b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
@@ -22,20 +22,15 @@
 
 package whisk.core.database
 
-import java.util.concurrent.ConcurrentMap
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ConcurrentMap, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.Promise
+import scala.concurrent.{ExecutionContext, Future, Promise}
 import scala.language.implicitConversions
 import scala.util.Failure
 import scala.util.Success
 import scala.util.control.NonFatal
-
 import com.github.benmanes.caffeine.cache.Caffeine
-
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
@@ -476,18 +471,18 @@ private class ConcurrentMapBackedCache[V](store: ConcurrentMap[Any, Future[V]])
   }
 
   def apply(key: Any, genValue: () => Future[V])(implicit ec: ExecutionContext): Future[V] = {
-    val promise = Promise[V]()
-    store.putIfAbsent(key, promise.future) match {
-      case null =>
-        val future = genValue()
-        future.onComplete { value =>
-          promise.complete(value)
-          // in case of exceptions we remove the cache entry (i.e. try again later)
-          if (value.isFailure) store.remove(key, promise.future)
+    store.computeIfAbsent(
+      key,
+      new java.util.function.Function[Any, Future[V]]() {
+        override def apply(key: Any): Future[V] = {
+          val future = genValue()
+          future.onComplete { value =>
+            // in case of exceptions we remove the cache entry (i.e. try again later)
+            if (value.isFailure) store.remove(key, future)
+          }
+          future
         }
-        future
-      case existingFuture => existingFuture
-    }
+      })
   }
 
   def remove(key: Any) = Option(store.remove(key))

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].