You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/08/31 13:31:18 UTC

[incubator-openwhisk] branch master updated: Add cache invalidation between controllers (#2624)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 78eb1f6  Add cache invalidation between controllers (#2624)
78eb1f6 is described below

commit 78eb1f6f1fb5aae2dd12d1371391695fd92fe3af
Author: Christian Bickel <gi...@cbickel.de>
AuthorDate: Thu Aug 31 15:31:15 2017 +0200

    Add cache invalidation between controllers (#2624)
    
    CUD operations will generate cache invalidation traffic on dedicated message bus topic. These are picked up by other controllers which in turn invalidate their caches.
---
 ansible/roles/kafka/tasks/deploy.yml               |   5 +-
 .../whisk/core/database/DocumentFactory.scala      |  16 +-
 .../MultipleReadersSingleWriterCache.scala         |  48 +++--
 .../core/database/RemoteCacheInvalidation.scala    |  88 ++++++++++
 .../main/scala/whisk/core/entity/CacheKey.scala    |  55 ++++++
 .../main/scala/whisk/core/entity/Identity.scala    |   6 +-
 .../main/scala/whisk/core/entity/WhiskAction.scala |   4 +-
 .../scala/whisk/core/entity/WhiskActivation.scala  |   1 -
 .../scala/whisk/core/entity/WhiskPackage.scala     |   1 -
 .../main/scala/whisk/core/entity/WhiskRule.scala   |   1 -
 .../scala/whisk/core/entity/WhiskTrigger.scala     |   1 -
 .../main/scala/whisk/core/controller/Actions.scala |   4 +
 .../scala/whisk/core/controller/ApiUtils.scala     |   7 +-
 .../scala/whisk/core/controller/Controller.scala   |   7 +
 .../scala/whisk/core/controller/Packages.scala     |   4 +
 .../scala/whisk/core/controller/RestAPIs.scala     |  39 +++--
 .../main/scala/whisk/core/controller/Rules.scala   |   4 +
 .../scala/whisk/core/controller/Triggers.scala     |   6 +-
 .../core/controller/actions/SequenceActions.scala  |   2 +-
 .../core/loadBalancer/LoadBalancerService.scala    |  20 ++-
 .../scala/whisk/core/invoker/InvokerReactive.scala |   2 +-
 .../src/test/scala/ha/CacheInvalidationTests.scala | 179 +++++++++++++++++++
 .../core/controller/test/ActionsApiTests.scala     |   6 +-
 .../core/controller/test/AuthenticateTests.scala   |   4 +-
 .../controller/test/ControllerTestCommon.scala     |   8 +-
 .../MultipleReadersSingleWriterCacheTests.scala    | 195 +++------------------
 .../whisk/core/entity/test/DatastoreTests.scala    |   3 +
 .../whisk/core/entity/test/MigrationEntities.scala |   2 -
 28 files changed, 477 insertions(+), 241 deletions(-)

diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml
index fb0e2b7..615c496 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -51,10 +51,13 @@
   delay: 5
 
 - name: create the health topic
-  shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic health --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ kafka.topics.health.retentionBytes }} --config retention.ms={{ kafka.topics.health.retentionMS }} --config segment.bytes={{ kafka.topics.health.segmentBytes }}'"
+  shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item }} --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ kafka.topics.health.retentionBytes }} --config retention.ms={{ kafka.topics.health.retentionMS }} --config segment.bytes={{ kafka.topics.health.segmentBytes }}'"
   register: command_result
   failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)"
   changed_when: "'Created topic' in command_result.stdout"
+  with_items:
+  - health
+  - cacheInvalidation
 
 - name: create the active-ack topics
   shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic completed{{ item.0 }} --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ kafka.topics.completed.retentionBytes }} --config retention.ms={{ kafka.topics.completed.retentionMS }} --config segment.bytes={{ kafka.topics.completed.segmentBytes }}'"
diff --git a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
index fd3f41b..451bae9 100644
--- a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
@@ -30,6 +30,7 @@ import akka.stream.IOResult
 import akka.stream.scaladsl.StreamConverters
 import spray.json.JsObject
 import whisk.common.TransactionId
+import whisk.core.entity.CacheKey
 import whisk.core.entity.DocId
 import whisk.core.entity.DocInfo
 import whisk.core.entity.DocRevision
@@ -129,10 +130,11 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
      * @param db the datastore client to fetch entity from
      * @param doc the entity to store
      * @param transid the transaction id for logging
+     * @param notifier an optional callback when cache changes
      * @return Future[DocInfo] with completion to DocInfo containing the save document id and revision
      */
     def put[Wsuper >: W](db: ArtifactStore[Wsuper], doc: W)(
-        implicit transid: TransactionId): Future[DocInfo] = {
+        implicit transid: TransactionId, notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
         Try {
             require(db != null, "db undefined")
             require(doc != null, "doc undefined")
@@ -140,7 +142,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
             implicit val logger = db.logging
             implicit val ec = db.executionContext
 
-            val key = cacheKeyForUpdate(doc)
+            val key = CacheKey(doc)
 
             cacheUpdate(doc, key, db.put(doc) map { docinfo =>
                 doc match {
@@ -157,7 +159,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
     }
 
     def attach[Wsuper >: W](db: ArtifactStore[Wsuper], doc: DocInfo, attachmentName: String, contentType: ContentType, bytes: InputStream)(
-        implicit transid: TransactionId): Future[DocInfo] = {
+        implicit transid: TransactionId, notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
 
         Try {
             require(db != null, "db undefined")
@@ -166,7 +168,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
             implicit val logger = db.logging
             implicit val ec = db.executionContext
 
-            val key = doc.id.asDocInfo
+            val key = CacheKey(doc.id.asDocInfo)
             // invalidate the key because attachments update the revision;
             // do not cache the new attachment (controller does not need it)
             cacheInvalidate(key, {
@@ -180,7 +182,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
     }
 
     def del[Wsuper >: W](db: ArtifactStore[Wsuper], doc: DocInfo)(
-        implicit transid: TransactionId): Future[Boolean] = {
+        implicit transid: TransactionId, notifier: Option[CacheChangeNotification]): Future[Boolean] = {
         Try {
             require(db != null, "db undefined")
             require(doc != null, "doc undefined")
@@ -188,7 +190,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
             implicit val logger = db.logging
             implicit val ec = db.executionContext
 
-            val key = doc.id.asDocInfo
+            val key = CacheKey(doc.id.asDocInfo)
             cacheInvalidate(key, db.del(doc))
         } match {
             case Success(f) => f
@@ -221,7 +223,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
             implicit val logger = db.logging
             implicit val ec = db.executionContext
             val key = doc.asDocInfo(rev)
-            _ => cacheLookup(key, db.get[W](key), fromCache)
+            _ => cacheLookup(CacheKey(key), db.get[W](key), fromCache)
         } match {
             case Success(f) => f
             case Failure(t) => Future.failed(t)
diff --git a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
index d824948..f767c60 100644
--- a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
+++ b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
@@ -22,22 +22,24 @@
 
 package whisk.core.database
 
-import java.util.concurrent.atomic.AtomicReference
 import java.util.concurrent.ConcurrentMap
 import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
 
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.Promise
+import scala.language.implicitConversions
 import scala.util.Failure
 import scala.util.Success
-import scala.language.implicitConversions
+import scala.util.control.NonFatal
+
+import com.github.benmanes.caffeine.cache.Caffeine
 
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
-import com.github.benmanes.caffeine.cache.Caffeine
-import scala.util.control.NonFatal
+import whisk.core.entity.CacheKey
 
 /**
  * A cache that allows multiple readers, but only a single writer, at
@@ -89,6 +91,8 @@ private object MultipleReadersSingleWriterCache {
     case class StaleRead(actualState: State) extends Exception(s"Attempted read of invalid entry due to $actualState.")
 }
 
+trait CacheChangeNotification extends (CacheKey => Future[Unit])
+
 trait MultipleReadersSingleWriterCache[W, Winfo] {
     import MultipleReadersSingleWriterCache._
     import MultipleReadersSingleWriterCache.State._
@@ -96,9 +100,6 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
     /** Subclasses: Toggle this to enable/disable caching for your entity type. */
     protected val cacheEnabled = true
 
-    /** Subclasses: tell me what key to use for updates. */
-    protected def cacheKeyForUpdate(w: W): Any
-
     private object Entry {
         def apply(transid: TransactionId, state: State, value: Option[Future[W]]): Entry = {
             new Entry(transid, new AtomicReference(state), value)
@@ -155,11 +156,13 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
      * This method posts a delete to the backing store, and either directly invalidates the cache entry
      * or informs any outstanding transaction that it must invalidate the cache on completion.
      */
-    protected def cacheInvalidate[R](key: Any, invalidator: => Future[R])(
-        implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Future[R] = {
+    protected def cacheInvalidate[R](key: CacheKey, invalidator: => Future[R])(
+        implicit ec: ExecutionContext, transid: TransactionId, logger: Logging, notifier: Option[CacheChangeNotification]): Future[R] = {
         if (cacheEnabled) {
             logger.info(this, s"invalidating $key on delete")
 
+            notifier.foreach(_(key))
+
             // try inserting our desired entry...
             val desiredEntry = Entry(transid, InvalidateInProgress, None)
             cache(key)(desiredEntry) flatMap { actualEntry =>
@@ -210,7 +213,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
     /**
      * This method may initiate a read from the backing store, and potentially stores the result in the cache.
      */
-    protected def cacheLookup[Wsuper >: W](key: Any, generator: => Future[W], fromCache: Boolean = cacheEnabled)(
+    protected def cacheLookup[Wsuper >: W](key: CacheKey, generator: => Future[W], fromCache: Boolean = cacheEnabled)(
         implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Future[W] = {
         if (fromCache) {
             val promise = Promise[W] // this promise completes with the generator value
@@ -253,9 +256,12 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
     /**
      * This method posts an update to the backing store, and potentially stores the result in the cache.
      */
-    protected def cacheUpdate(doc: W, key: Any, generator: => Future[Winfo])(
-        implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Future[Winfo] = {
+    protected def cacheUpdate(doc: W, key: CacheKey, generator: => Future[Winfo])(
+        implicit ec: ExecutionContext, transid: TransactionId, logger: Logging, notifier: Option[CacheChangeNotification]): Future[Winfo] = {
         if (cacheEnabled) {
+
+            notifier.foreach(_(key))
+
             // try inserting our desired entry...
             val desiredEntry = Entry(transid, WriteInProgress, Some(Future.successful(doc)))
             cache(key)(desiredEntry) flatMap { actualEntry =>
@@ -288,10 +294,16 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
     def cacheSize: Int = cache.size
 
     /**
+     * This method removes an entry from the cache immediately. You can use this method
+     * if you do not need to perform any updates on the backing store but only to the cache.
+     */
+    protected[database] def removeId(key: CacheKey)(implicit ec: ExecutionContext): Unit = cache.remove(key)
+
+    /**
      * Log a cache hit
      *
      */
-    private def makeNoteOfCacheHit(key: Any)(implicit transid: TransactionId, logger: Logging) = {
+    private def makeNoteOfCacheHit(key: CacheKey)(implicit transid: TransactionId, logger: Logging) = {
         transid.mark(this, LoggingMarkers.DATABASE_CACHE_HIT, s"[GET] serving from cache: $key")(logger)
     }
 
@@ -299,7 +311,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
      * Log a cache miss
      *
      */
-    private def makeNoteOfCacheMiss(key: Any)(implicit transid: TransactionId, logger: Logging) = {
+    private def makeNoteOfCacheMiss(key: CacheKey)(implicit transid: TransactionId, logger: Logging) = {
         transid.mark(this, LoggingMarkers.DATABASE_CACHE_MISS, s"[GET] serving from datastore: $key")(logger)
     }
 
@@ -308,7 +320,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
      * 1. either cache the result if there is no intervening delete or update, or
      * 2. invalidate the cache because there was an intervening delete or update.
      */
-    private def listenForReadDone(key: Any, entry: Entry, generator: => Future[W], promise: Promise[W])(
+    private def listenForReadDone(key: CacheKey, entry: Entry, generator: => Future[W], promise: Promise[W])(
         implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Unit = {
 
         generator onComplete {
@@ -362,7 +374,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
      * 1. either cache the result if there is no intervening delete or update, or
      * 2. invalidate the cache cache because there was an intervening delete or update
      */
-    private def listenForWriteDone(key: Any, entry: Entry, generator: => Future[Winfo])(
+    private def listenForWriteDone(key: CacheKey, entry: Entry, generator: => Future[Winfo])(
         implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Future[Winfo] = {
 
         generator andThen {
@@ -398,7 +410,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
     }
 
     /** Immediately invalidates the given entry. */
-    private def invalidateEntry(key: Any, entry: Entry)(
+    private def invalidateEntry(key: CacheKey, entry: Entry)(
         implicit transid: TransactionId, logger: Logging): Unit = {
         logger.info(this, s"invalidating $key")
         entry.invalidate()
@@ -406,7 +418,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
     }
 
     /** Invalidates the given entry after a given invalidator completes. */
-    private def invalidateEntryAfter[R](invalidator: => Future[R], key: Any, entry: Entry)(
+    private def invalidateEntryAfter[R](invalidator: => Future[R], key: CacheKey, entry: Entry)(
         implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Future[R] = {
 
         entry.grabInvalidationLock()
diff --git a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
new file mode 100644
index 0000000..902bf7d
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import java.nio.charset.StandardCharsets
+
+import scala.concurrent.Future
+import scala.concurrent.duration.DurationInt
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Try
+
+import akka.actor.ActorSystem
+import akka.actor.Props
+import spray.json._
+import whisk.common.Logging
+import whisk.core.WhiskConfig
+import whisk.core.connector.Message
+import whisk.core.connector.MessageFeed
+import whisk.core.connector.MessagingProvider
+import whisk.core.entity.CacheKey
+import whisk.core.entity.InstanceId
+import whisk.core.entity.WhiskAction
+import whisk.core.entity.WhiskPackage
+import whisk.core.entity.WhiskRule
+import whisk.core.entity.WhiskTrigger
+import whisk.spi.SpiLoader
+
+case class CacheInvalidationMessage(key: CacheKey, instanceId: String) extends Message {
+    override def serialize = CacheInvalidationMessage.serdes.write(this).compactPrint
+}
+
+object CacheInvalidationMessage extends DefaultJsonProtocol {
+    def parse(msg: String) = Try(serdes.read(msg.parseJson))
+    implicit val serdes = jsonFormat(CacheInvalidationMessage.apply _, "key", "instanceId")
+}
+
+class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance: InstanceId)(implicit logging: Logging, as: ActorSystem) {
+
+    implicit private val ec = as.dispatcher
+
+    private val topic = "cacheInvalidation"
+    private val instanceId = s"$component${instance.toInt}"
+
+    private val msgProvider = SpiLoader.get[MessagingProvider]()
+    private val cacheInvalidationConsumer = msgProvider.getConsumer(config, s"$topic$instanceId", topic, maxPeek = 128)
+    private val cacheInvalidationProducer = msgProvider.getProducer(config, ec)
+
+    def notifyOtherInstancesAboutInvalidation(key: CacheKey): Future[Unit] = {
+        cacheInvalidationProducer.send(topic, CacheInvalidationMessage(key, instanceId)).map(_ => Unit)
+    }
+
+    private val invalidationFeed = as.actorOf(Props {
+        new MessageFeed("cacheInvalidation", logging, cacheInvalidationConsumer, cacheInvalidationConsumer.maxPeek, 1.second, removeFromLocalCache)
+    })
+
+    private def removeFromLocalCache(bytes: Array[Byte]): Future[Unit] = Future {
+        val raw = new String(bytes, StandardCharsets.UTF_8)
+
+        CacheInvalidationMessage.parse(raw) match {
+            case Success(msg: CacheInvalidationMessage) => {
+                if (msg.instanceId != instanceId) {
+                    WhiskAction.removeId(msg.key)
+                    WhiskPackage.removeId(msg.key)
+                    WhiskRule.removeId(msg.key)
+                    WhiskTrigger.removeId(msg.key)
+                }
+            }
+            case Failure(t) => logging.error(this, s"failed processing message: $raw with $t")
+        }
+        invalidationFeed ! MessageFeed.Processed
+    }
+}
diff --git a/common/scala/src/main/scala/whisk/core/entity/CacheKey.scala b/common/scala/src/main/scala/whisk/core/entity/CacheKey.scala
new file mode 100644
index 0000000..de0f6fe
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/entity/CacheKey.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.entity
+
+import spray.json.DefaultJsonProtocol
+
+class UnsupportedCacheKeyTypeException(msg: String) extends Exception(msg)
+
+/**
+ * A key that is used to store an entity on the cache.
+ *
+ * @param mainId The main part for the key to be used. For example this is the id of a document.
+ * @param secondaryId A second part of the key. For example the revision of an entity. This part
+ * of the key will not be written to the logs.
+ */
+case class CacheKey(mainId: String, secondaryId: Option[String]) {
+    override def toString() = {
+        s"CacheKey($mainId)"
+    }
+}
+
+object CacheKey extends DefaultJsonProtocol {
+    implicit val serdes = jsonFormat2(CacheKey.apply)
+
+    def apply(key: Any): CacheKey = {
+        key match {
+            case e: EntityName => CacheKey(e.asString, None)
+            case a: AuthKey    => CacheKey(a.uuid.asString, Some(a.key.asString))
+            case d: DocInfo => {
+                val revision = if (d.rev.empty) None else Some(d.rev.asString)
+                CacheKey(d.id.asString, revision)
+            }
+            case w: WhiskEntity => CacheKey(w.docid.asDocInfo)
+            case s: String      => CacheKey(s, None)
+            case others => {
+                throw new UnsupportedCacheKeyTypeException(s"Unable to apply the entity ${others.getClass} on CacheKey.")
+            }
+        }
+    }
+}
diff --git a/common/scala/src/main/scala/whisk/core/entity/Identity.scala b/common/scala/src/main/scala/whisk/core/entity/Identity.scala
index d272f7a..32093c8 100644
--- a/common/scala/src/main/scala/whisk/core/entity/Identity.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/Identity.scala
@@ -45,7 +45,6 @@ object Identity extends MultipleReadersSingleWriterCache[Identity, DocInfo] with
     private val viewName = "subjects/identities"
 
     override val cacheEnabled = true
-    override def cacheKeyForUpdate(i: Identity) = i.authkey
     implicit val serdes = jsonFormat5(Identity.apply)
 
     /**
@@ -58,8 +57,9 @@ object Identity extends MultipleReadersSingleWriterCache[Identity, DocInfo] with
         implicit val logger: Logging = datastore.logging
         implicit val ec = datastore.executionContext
         val ns = namespace.asString
+        val key = CacheKey(namespace)
 
-        cacheLookup(ns, {
+        cacheLookup(key, {
             list(datastore, List(ns), limit = 1) map { list =>
                 list.length match {
                     case 1 =>
@@ -80,7 +80,7 @@ object Identity extends MultipleReadersSingleWriterCache[Identity, DocInfo] with
         implicit val logger: Logging = datastore.logging
         implicit val ec = datastore.executionContext
 
-        cacheLookup(authkey, {
+        cacheLookup(CacheKey(authkey), {
             list(datastore, List(authkey.uuid.asString, authkey.key.asString)) map { list =>
                 list.length match {
                     case 1 =>
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
index c4e90f4..f788ac4 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
@@ -30,6 +30,7 @@ import spray.json.DefaultJsonProtocol._
 import whisk.common.TransactionId
 import whisk.core.database.ArtifactStore
 import whisk.core.database.DocumentFactory
+import whisk.core.database.CacheChangeNotification
 import whisk.core.entity.Attachments._
 import whisk.core.entity.types.EntityStore
 
@@ -215,11 +216,10 @@ object WhiskAction
     override implicit val serdes = jsonFormat(WhiskAction.apply, "namespace", "name", "exec", "parameters", "limits", "version", "publish", "annotations")
 
     override val cacheEnabled = true
-    override def cacheKeyForUpdate(w: WhiskAction) = w.docid.asDocInfo
 
     // overriden to store attached code
     override def put[A >: WhiskAction](db: ArtifactStore[A], doc: WhiskAction)(
-        implicit transid: TransactionId): Future[DocInfo] = {
+        implicit transid: TransactionId, notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
 
         Try {
             require(db != null, "db undefined")
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
index b4f2f89..0fe0dcd 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
@@ -117,5 +117,4 @@ object WhiskActivation
     // Caching activations doesn't make much sense in the common case as usually,
     // an activation is only asked for once.
     override val cacheEnabled = false
-    override def cacheKeyForUpdate(w: WhiskActivation) = w.docid.asDocInfo
 }
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskPackage.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskPackage.scala
index e29f03f..ac73147 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskPackage.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskPackage.scala
@@ -198,7 +198,6 @@ object WhiskPackage
     }
 
     override val cacheEnabled = true
-    override def cacheKeyForUpdate(w: WhiskPackage) = w.docid.asDocInfo
 }
 
 /**
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskRule.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskRule.scala
index c91cc9c..749b6f5 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskRule.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskRule.scala
@@ -233,7 +233,6 @@ object WhiskRule
     }
 
     override val cacheEnabled = false
-    override def cacheKeyForUpdate(w: WhiskRule) = w.docid.asDocInfo
 }
 
 object WhiskRuleResponse extends DefaultJsonProtocol {
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskTrigger.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskTrigger.scala
index 128a7e5..d2271c4 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskTrigger.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskTrigger.scala
@@ -119,7 +119,6 @@ object WhiskTrigger
     override implicit val serdes = jsonFormat8(WhiskTrigger.apply)
 
     override val cacheEnabled = true
-    override def cacheKeyForUpdate(w: WhiskTrigger) = w.docid.asDocInfo
 }
 
 object WhiskTriggerPut extends DefaultJsonProtocol {
diff --git a/core/controller/src/main/scala/whisk/core/controller/Actions.scala b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
index ba74de8..d4a59cc 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Actions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
@@ -39,6 +39,7 @@ import spray.json.DefaultJsonProtocol._
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
 import whisk.core.controller.actions.PostActionActivation
+import whisk.core.database.CacheChangeNotification
 import whisk.core.database.NoDocumentException
 import whisk.core.entitlement._
 import whisk.core.entity._
@@ -84,6 +85,9 @@ trait WhiskActionsApi
     /** Database service to CRUD actions. */
     protected val entityStore: EntityStore
 
+    /** Notification service for cache invalidation. */
+    protected implicit val cacheChangeNotification: Some[CacheChangeNotification]
+
     /** Database service to get activations. */
     protected val activationStore: ActivationStore
 
diff --git a/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala b/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
index a52bd24..e6f3c51 100644
--- a/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
@@ -22,22 +22,20 @@ import scala.concurrent.Future
 import scala.util.Failure
 import scala.util.Success
 
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import akka.http.scaladsl.model.StatusCode
 import akka.http.scaladsl.model.StatusCodes.Conflict
 import akka.http.scaladsl.model.StatusCodes.InternalServerError
 import akka.http.scaladsl.model.StatusCodes.NotFound
 import akka.http.scaladsl.model.StatusCodes.OK
-import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import akka.http.scaladsl.server.Directives
 import akka.http.scaladsl.server.RequestContext
 import akka.http.scaladsl.server.RouteResult
-
 import spray.json.DefaultJsonProtocol._
 import spray.json.JsBoolean
 import spray.json.JsObject
 import spray.json.JsValue
 import spray.json.RootJsonFormat
-
 import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.controller.PostProcess.PostProcessEntity
@@ -46,6 +44,7 @@ import whisk.core.database.ArtifactStoreException
 import whisk.core.database.DocumentConflictException
 import whisk.core.database.DocumentFactory
 import whisk.core.database.DocumentTypeMismatchException
+import whisk.core.database.CacheChangeNotification
 import whisk.core.database.NoDocumentException
 import whisk.core.entity.DocId
 import whisk.core.entity.WhiskDocument
@@ -276,6 +275,7 @@ trait WriteOps extends Directives {
         postProcess: Option[PostProcessEntity[A]] = None)(
             implicit transid: TransactionId,
             format: RootJsonFormat[A],
+            notifier: Option[CacheChangeNotification],
             ma: Manifest[A]) = {
         // marker to return an existing doc with status OK rather than conflict if overwrite is false
         case class IdentityPut(self: A) extends Throwable
@@ -347,6 +347,7 @@ trait WriteOps extends Directives {
         postProcess: Option[PostProcessEntity[A]] = None)(
             implicit transid: TransactionId,
             format: RootJsonFormat[A],
+            notifier: Option[CacheChangeNotification],
             ma: Manifest[A]) = {
         onComplete(factory.get(datastore, docid) flatMap {
             entity =>
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 90d4023..4037e0d 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -29,11 +29,14 @@ import akka.http.scaladsl.server.Route
 import akka.stream.ActorMaterializer
 import spray.json._
 import spray.json.DefaultJsonProtocol._
+
 import whisk.common.AkkaLogging
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
+import whisk.core.database.RemoteCacheInvalidation
+import whisk.core.database.CacheChangeNotification
 import whisk.core.entitlement._
 import whisk.core.entity._
 import whisk.core.entity.ActivationId.ActivationIdGenerator
@@ -96,6 +99,10 @@ class Controller(
     private implicit val authStore = WhiskAuthStore.datastore(whiskConfig)
     private implicit val entityStore = WhiskEntityStore.datastore(whiskConfig)
     private implicit val activationStore = WhiskActivationStore.datastore(whiskConfig)
+    private implicit val cacheChangeNotification = Some(new CacheChangeNotification {
+        val remoteCacheInvalidaton = new RemoteCacheInvalidation(whiskConfig, "controller", instance)
+        override def apply(k: CacheKey) = remoteCacheInvalidaton.notifyOtherInstancesAboutInvalidation(k)
+    })
 
     // initialize backend services
     private implicit val loadBalancer = new LoadBalancerService(whiskConfig, instance, entityStore)
diff --git a/core/controller/src/main/scala/whisk/core/controller/Packages.scala b/core/controller/src/main/scala/whisk/core/controller/Packages.scala
index 925d5c1..cca510e 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Packages.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Packages.scala
@@ -31,6 +31,7 @@ import spray.json._
 
 import whisk.common.TransactionId
 import whisk.core.database.DocumentTypeMismatchException
+import whisk.core.database.CacheChangeNotification
 import whisk.core.database.NoDocumentException
 import whisk.core.entitlement._
 import whisk.core.entity._
@@ -46,6 +47,9 @@ trait WhiskPackagesApi extends WhiskCollectionAPI with ReferencedEntities {
     /** Database service to CRUD packages. */
     protected val entityStore: EntityStore
 
+    /** Notification service for cache invalidation. */
+    protected implicit val cacheChangeNotification: Some[CacheChangeNotification]
+
     /** Route directives for API. The methods that are supported on packages. */
     protected override lazy val entityOps = put | get | delete
 
diff --git a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
index 0e374f6..f75750e 100644
--- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
@@ -17,31 +17,31 @@
 
 package whisk.core.controller
 
+import scala.concurrent.ExecutionContext
+
 import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
 import akka.http.scaladsl.model.StatusCodes._
 import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.model.headers._
 import akka.http.scaladsl.server.Directives
 import akka.http.scaladsl.server.Route
 import akka.http.scaladsl.model.headers._
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.stream.ActorMaterializer
 
 import spray.json._
 import spray.json.DefaultJsonProtocol._
-
-import scala.concurrent.ExecutionContext
-
+import whisk.common.Logging
 import whisk.common.TransactionId
+import whisk.core.database.CacheChangeNotification
 import whisk.core.WhiskConfig
 import whisk.core.WhiskConfig.whiskVersionBuildno
 import whisk.core.WhiskConfig.whiskVersionDate
-import whisk.core.entity.WhiskAuthStore
-import whisk.common.Logging
-import whisk.common.TransactionId
-import whisk.core.entity._
-import whisk.core.entity.types._
 import whisk.core.entitlement._
+import whisk.core.entity._
 import whisk.core.entity.ActivationId.ActivationIdGenerator
+import whisk.core.entity.WhiskAuthStore
+import whisk.core.entity.types._
 import whisk.core.loadBalancer.LoadBalancerService
 
 /**
@@ -89,10 +89,10 @@ protected[controller] object RestApiCommons {
         Authenticate.requiredProperties ++
         Collection.requiredProperties
 
+    import akka.http.scaladsl.model.HttpCharsets
+    import akka.http.scaladsl.model.MediaTypes.`application/json`
     import akka.http.scaladsl.unmarshalling.FromEntityUnmarshaller
     import akka.http.scaladsl.unmarshalling.Unmarshaller
-    import akka.http.scaladsl.model.MediaTypes.`application/json`
-    import akka.http.scaladsl.model.HttpCharsets
 
     /**
      * Extract an empty entity into a JSON object. This is useful for the
@@ -140,6 +140,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
     implicit val entitlementProvider: EntitlementProvider,
     implicit val activationIdFactory: ActivationIdGenerator,
     implicit val loadBalancer: LoadBalancerService,
+    implicit val cacheChangeNotification: Some[CacheChangeNotification],
     implicit val activationStore: ActivationStore,
     implicit val whiskConfig: WhiskConfig)
     extends SwaggerDocs(Uri.Path(apiPath) / apiVersion, "apiv1swagger.json")
@@ -208,12 +209,12 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
     private val web = new WebActionsApi(Seq("web"), this.WebApiDirectives)
 
     class NamespacesApi(
-       val apiPath: String,
-       val apiVersion: String)(
-       implicit override val entityStore: EntityStore,
-       override val entitlementProvider: EntitlementProvider,
-       override val executionContext: ExecutionContext,
-       override val logging: Logging)
+        val apiPath: String,
+        val apiVersion: String)(
+        implicit override val entityStore: EntityStore,
+        override val entitlementProvider: EntitlementProvider,
+        override val executionContext: ExecutionContext,
+        override val logging: Logging)
     extends WhiskNamespacesApi
 
     class ActionsApi(
@@ -226,6 +227,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
         override val entitlementProvider: EntitlementProvider,
         override val activationIdFactory: ActivationIdGenerator,
         override val loadBalancer: LoadBalancerService,
+        override val cacheChangeNotification: Some[CacheChangeNotification],
         override val executionContext: ExecutionContext,
         override val logging: Logging,
         override val whiskConfig: WhiskConfig)
@@ -250,6 +252,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
         override val entitlementProvider: EntitlementProvider,
         override val activationIdFactory: ActivationIdGenerator,
         override val loadBalancer: LoadBalancerService,
+        override val cacheChangeNotification: Some[CacheChangeNotification],
         override val executionContext: ExecutionContext,
         override val logging: Logging,
         override val whiskConfig: WhiskConfig)
@@ -263,6 +266,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
         override val entitlementProvider: EntitlementProvider,
         override val activationIdFactory: ActivationIdGenerator,
         override val loadBalancer: LoadBalancerService,
+        override val cacheChangeNotification: Some[CacheChangeNotification],
         override val executionContext: ExecutionContext,
         override val logging: Logging,
         override val whiskConfig: WhiskConfig)
@@ -277,6 +281,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
         override val activationStore: ActivationStore,
         override val activationIdFactory: ActivationIdGenerator,
         override val loadBalancer: LoadBalancerService,
+        override val cacheChangeNotification: Some[CacheChangeNotification],
         override val executionContext: ExecutionContext,
         override val logging: Logging,
         override val whiskConfig: WhiskConfig,
diff --git a/core/controller/src/main/scala/whisk/core/controller/Rules.scala b/core/controller/src/main/scala/whisk/core/controller/Rules.scala
index e65c845..6f657b2 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Rules.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Rules.scala
@@ -30,6 +30,7 @@ import spray.json.DeserializationException
 
 import whisk.common.TransactionId
 import whisk.core.database.DocumentConflictException
+import whisk.core.database.CacheChangeNotification
 import whisk.core.database.NoDocumentException
 import whisk.core.entity._
 import whisk.core.entity.types.EntityStore
@@ -54,6 +55,9 @@ trait WhiskRulesApi extends WhiskCollectionAPI with ReferencedEntities {
     /** JSON response formatter. */
     import RestApiCommons.jsonDefaultResponsePrinter
 
+    /** Notification service for cache invalidation. */
+    protected implicit val cacheChangeNotification: Some[CacheChangeNotification]
+
     /** Path to Rules REST API. */
     protected val rulesPath = "rules"
 
diff --git a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
index d77d981..c9d1444 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
@@ -45,6 +45,7 @@ import spray.json._
 import spray.json.DefaultJsonProtocol.RootJsObjectFormat
 
 import whisk.common.TransactionId
+import whisk.core.database.CacheChangeNotification
 import whisk.core.entitlement.Collection
 import whisk.core.entity.ActivationResponse
 import whisk.core.entity.EntityPath
@@ -57,9 +58,9 @@ import whisk.core.entity.WhiskTrigger
 import whisk.core.entity.WhiskTriggerPut
 import whisk.core.entity.types.ActivationStore
 import whisk.core.entity.types.EntityStore
-import whisk.http.ErrorResponse.terminate
 import whisk.core.entity.Identity
 import whisk.core.entity.FullyQualifiedEntityName
+import whisk.http.ErrorResponse.terminate
 
 /** A trait implementing the triggers API. */
 trait WhiskTriggersApi extends WhiskCollectionAPI {
@@ -73,6 +74,9 @@ trait WhiskTriggersApi extends WhiskCollectionAPI {
     /** Database service to CRUD triggers. */
     protected val entityStore: EntityStore
 
+    /** Notification service for cache invalidation. */
+    protected implicit val cacheChangeNotification: Some[CacheChangeNotification]
+
     /** Database service to get activations. */
     protected val activationStore: ActivationStore
 
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
index 55bfac4..e60e1dd 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
@@ -160,7 +160,7 @@ protected[actions] trait SequenceActions {
      */
     private def storeSequenceActivation(activation: WhiskActivation)(implicit transid: TransactionId): Unit = {
         logging.info(this, s"recording activation '${activation.activationId}'")
-        WhiskActivation.put(activationStore, activation) onComplete {
+        WhiskActivation.put(activationStore, activation)(transid, notifier = None) onComplete {
             case Success(id) => logging.info(this, s"recorded activation")
             case Failure(t)  => logging.error(this, s"failed to record activation ${activation.activationId} with error ${t.getLocalizedMessage}")
         }
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index b4c79ee..07ca455 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -18,6 +18,8 @@
 package whisk.core.loadBalancer
 
 import java.nio.charset.StandardCharsets
+
+import scala.annotation.tailrec
 import scala.concurrent.Await
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
@@ -25,31 +27,33 @@ import scala.concurrent.Promise
 import scala.concurrent.duration.DurationInt
 import scala.util.Failure
 import scala.util.Success
+
 import org.apache.kafka.clients.producer.RecordMetadata
+
 import akka.actor.ActorRefFactory
 import akka.actor.ActorSystem
 import akka.actor.Props
-import akka.pattern.ask
 import akka.util.Timeout
+import akka.pattern.ask
+
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
 import whisk.core.WhiskConfig._
-import whisk.core.connector.MessagingProvider
 import whisk.core.connector.{ ActivationMessage, CompletionMessage }
 import whisk.core.connector.MessageFeed
 import whisk.core.connector.MessageProducer
+import whisk.core.connector.MessagingProvider
 import whisk.core.database.NoDocumentException
 import whisk.core.entity.{ ActivationId, WhiskActivation }
-import whisk.core.entity.InstanceId
+import whisk.core.entity.EntityName
 import whisk.core.entity.ExecutableWhiskAction
+import whisk.core.entity.Identity
+import whisk.core.entity.InstanceId
 import whisk.core.entity.UUID
 import whisk.core.entity.WhiskAction
 import whisk.core.entity.types.EntityStore
-import scala.annotation.tailrec
-import whisk.core.entity.EntityName
-import whisk.core.entity.Identity
 import whisk.spi.SpiLoader
 
 trait LoadBalancer {
@@ -162,9 +166,9 @@ class LoadBalancerService(
     private def createTestActionForInvokerHealth(db: EntityStore, action: WhiskAction): Future[Unit] = {
         implicit val tid = TransactionId.loadbalancer
         WhiskAction.get(db, action.docid).flatMap { oldAction =>
-            WhiskAction.put(db, action.revision(oldAction.rev))
+            WhiskAction.put(db, action.revision(oldAction.rev))(tid, notifier = None)
         }.recover {
-            case _: NoDocumentException => WhiskAction.put(db, action)
+            case _: NoDocumentException => WhiskAction.put(db, action)(tid, notifier = None)
         }.map(_ => {}).andThen {
             case Success(_) => logging.info(this, "test action for invoker health now exists")
             case Failure(e) => logging.error(this, s"error creating test action for invoker health: $e")
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 1f64625..0f8a2cf 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -140,7 +140,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
     val store = (tid: TransactionId, activation: WhiskActivation) => {
         implicit val transid = tid
         logging.info(this, "recording the activation result to the data store")
-        WhiskActivation.put(activationStore, activation).andThen {
+        WhiskActivation.put(activationStore, activation)(tid, notifier = None).andThen {
             case Success(id) => logging.info(this, s"recorded activation")
             case Failure(t)  => logging.error(this, s"failed to record activation")
         }
diff --git a/tests/src/test/scala/ha/CacheInvalidationTests.scala b/tests/src/test/scala/ha/CacheInvalidationTests.scala
new file mode 100644
index 0000000..ecda623
--- /dev/null
+++ b/tests/src/test/scala/ha/CacheInvalidationTests.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ha
+
+import scala.concurrent.Await
+import scala.concurrent.duration.DurationInt
+
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.marshalling.Marshal
+import akka.http.scaladsl.model.HttpMethods
+import akka.http.scaladsl.model.HttpRequest
+import akka.http.scaladsl.model.RequestEntity
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.model.headers.Authorization
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.unmarshalling.Unmarshal
+import akka.stream.ActorMaterializer
+import common.WhiskProperties
+import common.WskActorSystem
+import common.WskTestHelpers
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import whisk.core.WhiskConfig
+import akka.http.scaladsl.model.StatusCode
+
+@RunWith(classOf[JUnitRunner])
+class CacheInvalidationTests
+    extends FlatSpec
+    with Matchers
+    with WskTestHelpers
+    with WskActorSystem {
+
+    implicit val materializer = ActorMaterializer()
+
+    val hosts = WhiskProperties.getProperty("controller.hosts").split(",")
+    val authKey = WhiskProperties.readAuthKey(WhiskProperties.getAuthFileForTesting)
+
+    val timeout = 15.seconds
+
+    def retry[T](fn: => T) = whisk.utils.retry(fn, 15, Some(1.second))
+
+    def updateAction(name: String, code: String, controllerInstance: Int = 0) = {
+        require(controllerInstance >= 0 && controllerInstance < hosts.length, "Controller instance not known.")
+
+        val host = hosts(controllerInstance)
+        val port = WhiskProperties.getControllerBasePort + controllerInstance
+
+        val body = JsObject("namespace" -> JsString("_"), "name" -> JsString(name), "exec" -> JsObject("kind" -> JsString("nodejs:default"), "code" -> JsString(code)))
+
+        val request = Marshal(body).to[RequestEntity].flatMap { entity =>
+            Http().singleRequest(HttpRequest(
+                method = HttpMethods.PUT,
+                uri = Uri().withScheme("http").withHost(host).withPort(port).withPath(Uri.Path(s"/api/v1/namespaces/_/actions/$name")).withQuery(Uri.Query("overwrite" -> true.toString)),
+                headers = List(Authorization(BasicHttpCredentials(authKey.split(":")(0), authKey.split(":")(1)))),
+                entity = entity)).flatMap { response =>
+                val action = Unmarshal(response).to[JsObject].map { resBody =>
+                    withClue(s"Error in Body: $resBody")(response.status shouldBe StatusCodes.OK)
+                    resBody
+                }
+                action
+            }
+        }
+
+        Await.result(request, timeout)
+    }
+
+    def getAction(name: String, controllerInstance: Int = 0, expectedCode: StatusCode = StatusCodes.OK) = {
+        require(controllerInstance >= 0 && controllerInstance < hosts.length, "Controller instance not known.")
+
+        val host = hosts(controllerInstance)
+        val port = WhiskProperties.getControllerBasePort + controllerInstance
+
+        val request = Http().singleRequest(HttpRequest(
+            method = HttpMethods.GET,
+            uri = Uri().withScheme("http").withHost(host).withPort(port).withPath(Uri.Path(s"/api/v1/namespaces/_/actions/$name")),
+            headers = List(Authorization(BasicHttpCredentials(authKey.split(":")(0), authKey.split(":")(1)))))).flatMap { response =>
+            val action = Unmarshal(response).to[JsObject].map { resBody =>
+                withClue(s"Wrong statuscode from controller. Body is: $resBody")(response.status shouldBe expectedCode)
+                resBody
+            }
+            action
+        }
+
+        Await.result(request, timeout)
+    }
+
+    def deleteAction(name: String, controllerInstance: Int = 0, expectedCode: Option[StatusCode] = Some(StatusCodes.OK)) = {
+        require(controllerInstance >= 0 && controllerInstance < hosts.length, "Controller instance not known.")
+
+        val host = hosts(controllerInstance)
+        val port = WhiskProperties.getControllerBasePort + controllerInstance
+
+        val request = Http().singleRequest(HttpRequest(
+            method = HttpMethods.DELETE,
+            uri = Uri().withScheme("http").withHost(host).withPort(port).withPath(Uri.Path(s"/api/v1/namespaces/_/actions/$name")),
+            headers = List(Authorization(BasicHttpCredentials(authKey.split(":")(0), authKey.split(":")(1)))))).flatMap { response =>
+            val action = Unmarshal(response).to[JsObject].map { resBody =>
+                expectedCode.map { code =>
+                    withClue(s"Wrong statuscode from controller. Body is: $resBody")(response.status shouldBe code)
+                }
+                resBody
+            }
+            action
+        }
+
+        Await.result(request, timeout)
+    }
+
+    behavior of "the cache"
+
+    it should "be invalidated on updating an entity" in {
+        if (WhiskProperties.getProperty(WhiskConfig.controllerInstances).toInt >= 2) {
+            val actionName = "invalidateRemoteCacheOnUpdate"
+
+            deleteAction(actionName, 0, None)
+            deleteAction(actionName, 1, None)
+
+            // Create an action on controller0
+            val createdAction = updateAction(actionName, "CODE_CODE_CODE", 0)
+
+            // Get action from controller1
+            val actionFromController1 = getAction(actionName, 1)
+            createdAction shouldBe actionFromController1
+
+            // Update the action on controller0
+            val updatedAction = updateAction(actionName, "CODE_CODE", 0)
+
+            retry({
+                // Get action from controller1
+                val updatedActionFromController1 = getAction(actionName, 1)
+                updatedAction shouldBe updatedActionFromController1
+            })
+        }
+    }
+
+    it should "be invalidated on deleting an entity" in {
+        if (WhiskProperties.getProperty(WhiskConfig.controllerInstances).toInt >= 2) {
+            val actionName = "invalidateRemoteCacheOnDelete"
+
+            deleteAction(actionName, 0, None)
+            deleteAction(actionName, 1, None)
+
+            // Create an action on controller0
+            val createdAction = updateAction(actionName, "CODE_CODE_CODE", 0)
+            // Get action from controller1 (Now its in the cache of controller 0 and 1)
+            val actionFromController1 = getAction(actionName, 1)
+            createdAction shouldBe actionFromController1
+
+            retry({
+                // Delete the action on controller0 (It should be deleted automatically from the cache of controller1)
+                val updatedAction = deleteAction(actionName, 0)
+                // Get action from controller1 should fail with 404
+                getAction(actionName, 1, StatusCodes.NotFound)
+            })
+        }
+    }
+}
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
index 150a04e..1aae330 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
@@ -469,7 +469,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
                 action.parameters, action.limits, action.version,
                 action.publish, action.annotations ++ Parameters(WhiskAction.execFieldName, NODEJS6)))
         }
-        stream.toString should include regex (s"caching*.*${action.docid.asDocInfo}")
+        stream.toString should include(s"caching ${CacheKey(action)}")
         stream.reset()
 
         // second request should fetch from cache
@@ -481,7 +481,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
                 action.publish, action.annotations ++ Parameters(WhiskAction.execFieldName, NODEJS6)))
         }
 
-        stream.toString should include regex (s"serving from cache:*.*${action.docid.asDocInfo}")
+        stream.toString should include(s"serving from cache: ${CacheKey(action)}")
         stream.reset()
 
         // delete should invalidate cache
@@ -492,7 +492,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
                 action.parameters, action.limits, action.version,
                 action.publish, action.annotations ++ Parameters(WhiskAction.execFieldName, NODEJS6)))
         }
-        stream.toString should include regex (s"invalidating*.*${action.docid.asDocInfo}")
+        stream.toString should include(s"invalidating ${CacheKey(action)}")
         stream.reset()
     }
 
diff --git a/tests/src/test/scala/whisk/core/controller/test/AuthenticateTests.scala b/tests/src/test/scala/whisk/core/controller/test/AuthenticateTests.scala
index e964856..2c6415b 100644
--- a/tests/src/test/scala/whisk/core/controller/test/AuthenticateTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/AuthenticateTests.scala
@@ -62,14 +62,14 @@ class AuthenticateTests extends ControllerTestCommon with Authenticate {
             user.get shouldBe Identity(subject, ns.name, ns.authkey, Privilege.ALL)
 
             // first lookup should have been from datastore
-            stream.toString should include regex (s"serving from datastore: ${ns.authkey.uuid.asString}")
+            stream.toString should include(s"serving from datastore: ${CacheKey(ns.authkey)}")
             stream.reset()
 
             // repeat query, now should be served from cache
             val cachedUser = Await.result(validateCredentials(Some(pass))(transid()), dbOpTimeout)
             cachedUser.get shouldBe Identity(subject, ns.name, ns.authkey, Privilege.ALL)
 
-            stream.toString should include regex (s"serving from cache: ${ns.authkey.uuid.asString}")
+            stream.toString should include(s"serving from cache: ${CacheKey(ns.authkey)}")
             stream.reset()
         }
 
diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index dbfa17f..f899e81 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -42,6 +42,7 @@ import whisk.core.connector.ActivationMessage
 import whisk.core.controller.RestApiCommons
 import whisk.core.controller.WhiskServices
 import whisk.core.database.DocumentFactory
+import whisk.core.database.CacheChangeNotification
 import whisk.core.database.test.DbUtils
 import whisk.core.entitlement._
 import whisk.core.entity._
@@ -85,6 +86,12 @@ protected trait ControllerTestCommon
         override def make = fixedId
     }
 
+    implicit val cacheChangeNotification = Some {
+        new CacheChangeNotification {
+            override def apply(k: CacheKey): Future[Unit] = Future.successful(())
+        }
+    }
+
     val entityStore = WhiskEntityStore.datastore(whiskConfig)
     val activationStore = WhiskActivationStore.datastore(whiskConfig)
     val authStore = WhiskAuthStore.datastore(whiskConfig)
@@ -169,7 +176,6 @@ protected trait ControllerTestCommon
         with DefaultJsonProtocol {
         implicit val serdes = jsonFormat5(BadEntity.apply)
         override val cacheEnabled = true
-        override def cacheKeyForUpdate(w: BadEntity) = w.docid.asDocInfo
     }
 }
 
diff --git a/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala b/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
index f59c5f1..6a8f272 100644
--- a/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
+++ b/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
@@ -17,197 +17,58 @@
 
 package whisk.core.database.test
 
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.Executors
-import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
-import scala.language.postfixOps
-import scala.util.Failure
-import scala.util.Success
 
+import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
 
 import common.StreamLogging
 import common.WskActorSystem
-import whisk.common.Logging
 import whisk.common.TransactionId
+import whisk.core.database.CacheChangeNotification
 import whisk.core.database.MultipleReadersSingleWriterCache
+import whisk.core.entity.CacheKey
 
-class MultipleReadersSingleWriterCacheTests(nIters: Int = 3) extends FlatSpec
+@RunWith(classOf[JUnitRunner])
+class MultipleReadersSingleWriterCacheTests extends FlatSpec
     with Matchers
     with MultipleReadersSingleWriterCache[String, String]
     with WskActorSystem
     with StreamLogging {
 
-    "the cache" should "support simple CRUD" in {
-        val inhibits = doReadWriteRead("foo").go(0 seconds)
-        inhibits.debug(this)
+    behavior of "the cache"
 
-        inhibits.nReadInhibits.get should be(0)
-        cacheSize should be(1)
-    }
-
-    "the cache" should "support concurrent CRUD to different keys" in {
-        //
-        // for the first iter, all reads are not-cached and each thread
-        // requests a different key, so we expect no read inhibits, and a
-        // bunch of write inhibits
-        //
-        val inhibits = doCRUD("CONCURRENT CRUD to different keys", { i => "foop_" + i })
-        inhibits.nReadInhibits.get should be(0)
-        inhibits.nWriteInhibits.get should not be (0)
-
-        //
-        // after the first iter, the keys already exist, so the first read
-        // should be cached, resulting in the writes proceeding more
-        // smoothly this time, thus inhibiting some of the second reads
-        //
-        for (i <- 1 to nIters - 1) {
-            doCRUD("CONCURRENT CRUD to different keys", { i => "foop_" + i })
-                .nReadInhibits.get should not be (0)
-        }
-    }
-
-    "the cache" should "support concurrent CRUD to shared keys" in {
-        for (i <- 1 to nIters) {
-            doCRUD("CONCURRENT CRUD to shared keys", sharedKeys)
-                .nWriteInhibits.get should not be (0)
-        }
-    }
-
-    "the cache" should "support concurrent CRUD to shared keys (zero latency)" in {
-        var hasInhibits = false
-        for (i <- 1 to nIters) {
-            hasInhibits = doCRUD("concurrent CRUD to shared keys (zero latency)", sharedKeys, 0 seconds)
-                .hasInhibits
-        }
-        hasInhibits should not be (false)
-    }
-
-    "the cache" should "support concurrent CRUD to shared keys (short latency)" in {
-        for (i <- 1 to nIters) {
-            doCRUD("concurrent CRUD to shared keys (short latency)", sharedKeys, 10 milliseconds)
-                .hasInhibits should be(true)
-        }
-    }
-
-    "the cache" should "support concurrent CRUD to shared keys (medium latency)" in {
-        for (i <- 1 to nIters) {
-            doCRUD("concurrent CRUD to shared keys (medium latency)", sharedKeys, 100 milliseconds)
-                .hasInhibits should be(true)
-        }
-    }
-
-    "the cache" should "support concurrent CRUD to shared keys (long latency)" in {
-        for (i <- 1 to nIters) {
-            doCRUD("CONCURRENT CRUD to shared keys (long latency)", sharedKeys, 5 seconds)
-                .nWriteInhibits.get should not be (0)
-        }
-    }
-
-    "the cache" should "support concurrent CRUD to shared keys, with update first" in {
-        for (i <- 1 to nIters) {
-            doCRUD("CONCURRENT CRUD to shared keys, with update first", sharedKeys, 1 second, false)
-                .nWriteInhibits.get should be(0)
-        }
-    }
-
-    def sharedKeys = { i: Int => "foop_" + (i % 2) }
-
-    def doCRUD(
-        testName: String,
-        key: Int => String,
-        delay: FiniteDuration = 1 second,
-        readsFirst: Boolean = true,
-        nThreads: Int = 10): Inhibits = {
-
-        System.out.println(testName);
-
-        val exec = Executors.newFixedThreadPool(nThreads)
-        val inhibits = Inhibits()
+    it should "execute the callback on invalidating and updating an entry" in {
+        val ctr = new AtomicInteger(0)
+        val key = CacheKey("key")
 
-        for (i <- 1 to nThreads) {
-            exec.submit(new Runnable { def run() = { doReadWriteRead(key(i), inhibits, readsFirst).go(delay) } })
-        }
-
-        exec.shutdown
-        exec.awaitTermination(2, TimeUnit.MINUTES)
-
-        inhibits.debug(this)
-        inhibits
-    }
-
-    case class Inhibits(
-        nReadInhibits: AtomicInteger = new AtomicInteger(0),
-        nWriteInhibits: AtomicInteger = new AtomicInteger(0)) {
-
-        def debug(from: AnyRef) = {
-            logging.debug(from, "InhibitedReads: " + nReadInhibits)
-            logging.debug(from, "InhibitedWrites: " + nWriteInhibits)
-        }
-
-        def hasInhibits: Boolean = { nReadInhibits.get > 0 || nWriteInhibits.get > 0 }
-    }
-
-    private case class doReadWriteRead(key: String, inhibits: Inhibits = Inhibits(), readFirst: Boolean = true)(implicit logging: Logging) {
-        def go(implicit delay: FiniteDuration): Inhibits = {
-            val latch = new CountDownLatch(2)
-
-            implicit val transId = TransactionId.testing
-
-            if (!readFirst) {
-                // we want to do the update before the first read
-                cacheUpdate(key, key, delayed("bar_b")) onFailure {
-                    case t =>
-                        inhibits.nWriteInhibits.incrementAndGet();
-                }
-            }
-
-            cacheLookup(key, delayed("bar"), true) onComplete {
-                case Success(s) => {
-                    latch.countDown()
-                }
-                case Failure(t) => {
-                    latch.countDown()
-                    inhibits.nReadInhibits.incrementAndGet();
-                }
-            }
-
-            if (readFirst) {
-                // we did the read before the update, so do the write next
-                cacheUpdate(key, key, delayed("bar_b")) onFailure {
-                    case t =>
-                        inhibits.nWriteInhibits.incrementAndGet();
-                }
-            }
-
-            cacheLookup(key, delayed("bar_c"), true) onComplete {
-                case Success(s) => {
-                    latch.countDown();
-                }
-                case Failure(t) => {
-                    inhibits.nReadInhibits.incrementAndGet();
-                    latch.countDown();
+        implicit val transId = TransactionId.testing
+        lazy implicit val cacheUpdateNotifier = Some {
+            new CacheChangeNotification {
+                override def apply(key: CacheKey) = {
+                    ctr.incrementAndGet()
+                    Future.successful(())
                 }
             }
+        }
 
-            latch.await(2, TimeUnit.MINUTES)
+        // Create an cache entry
+        cacheUpdate("doc", key, Future.successful("db save successful"))
+        ctr.get shouldBe 1
 
-            inhibits
-        }
-    }
+        // Callback should be called if entry exists
+        cacheInvalidate(key, Future.successful(()))
+        ctr.get shouldBe 2
+        cacheUpdate("docdoc", key, Future.successful("update in db successful"))
+        ctr.get shouldBe 3
 
-    private def delayed[W](v: W)(implicit delay: FiniteDuration): Future[W] = {
-        akka.pattern.after(duration = delay, using = actorSystem.scheduler)(
-            Future.successful { v })
+        // Callback should be called if entry does not exist
+        cacheInvalidate(CacheKey("abc"), Future.successful(()))
+        ctr.get shouldBe 4
     }
-
-    /** we are using cache keys, so the update key is just the string itself */
-    override protected def cacheKeyForUpdate(w: String): String = (w)
 }
diff --git a/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala b/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala
index 466f326..f881510 100644
--- a/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala
@@ -32,6 +32,7 @@ import common.StreamLogging
 import common.WskActorSystem
 import whisk.core.WhiskConfig
 import whisk.core.database.DocumentConflictException
+import whisk.core.database.CacheChangeNotification
 import whisk.core.database.NoDocumentException
 import whisk.core.database.test.DbUtils
 import whisk.core.entity._
@@ -50,6 +51,8 @@ class DatastoreTests extends FlatSpec
     val datastore = WhiskEntityStore.datastore(config)
     val authstore = WhiskAuthStore.datastore(config)
 
+    implicit val cacheUpdateNotifier: Option[CacheChangeNotification] = None
+
     override def afterAll() {
         println("Shutting down store connections")
         datastore.shutdown()
diff --git a/tests/src/test/scala/whisk/core/entity/test/MigrationEntities.scala b/tests/src/test/scala/whisk/core/entity/test/MigrationEntities.scala
index 0c83ca5..a6af6e4 100644
--- a/tests/src/test/scala/whisk/core/entity/test/MigrationEntities.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/MigrationEntities.scala
@@ -58,7 +58,6 @@ object OldWhiskRule
 
     override val collectionName = "rules"
     override implicit val serdes = jsonFormat8(OldWhiskRule.apply)
-    override def cacheKeyForUpdate(t: OldWhiskRule) = t.docid.asDocInfo
 }
 
 /**
@@ -86,5 +85,4 @@ object OldWhiskTrigger
 
     override val collectionName = "triggers"
     override implicit val serdes = jsonFormat7(OldWhiskTrigger.apply)
-    override def cacheKeyForUpdate(t: OldWhiskTrigger) = t.docid.asDocInfo
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].