You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/08/31 13:31:18 UTC
[incubator-openwhisk] branch master updated: Add cache invalidation
between controllers (#2624)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 78eb1f6 Add cache invalidation between controllers (#2624)
78eb1f6 is described below
commit 78eb1f6f1fb5aae2dd12d1371391695fd92fe3af
Author: Christian Bickel <gi...@cbickel.de>
AuthorDate: Thu Aug 31 15:31:15 2017 +0200
Add cache invalidation between controllers (#2624)
CUD operations will generate cache invalidation traffic on dedicated message bus topic. These are picked up by other controllers which in turn invalidate their caches.
---
ansible/roles/kafka/tasks/deploy.yml | 5 +-
.../whisk/core/database/DocumentFactory.scala | 16 +-
.../MultipleReadersSingleWriterCache.scala | 48 +++--
.../core/database/RemoteCacheInvalidation.scala | 88 ++++++++++
.../main/scala/whisk/core/entity/CacheKey.scala | 55 ++++++
.../main/scala/whisk/core/entity/Identity.scala | 6 +-
.../main/scala/whisk/core/entity/WhiskAction.scala | 4 +-
.../scala/whisk/core/entity/WhiskActivation.scala | 1 -
.../scala/whisk/core/entity/WhiskPackage.scala | 1 -
.../main/scala/whisk/core/entity/WhiskRule.scala | 1 -
.../scala/whisk/core/entity/WhiskTrigger.scala | 1 -
.../main/scala/whisk/core/controller/Actions.scala | 4 +
.../scala/whisk/core/controller/ApiUtils.scala | 7 +-
.../scala/whisk/core/controller/Controller.scala | 7 +
.../scala/whisk/core/controller/Packages.scala | 4 +
.../scala/whisk/core/controller/RestAPIs.scala | 39 +++--
.../main/scala/whisk/core/controller/Rules.scala | 4 +
.../scala/whisk/core/controller/Triggers.scala | 6 +-
.../core/controller/actions/SequenceActions.scala | 2 +-
.../core/loadBalancer/LoadBalancerService.scala | 20 ++-
.../scala/whisk/core/invoker/InvokerReactive.scala | 2 +-
.../src/test/scala/ha/CacheInvalidationTests.scala | 179 +++++++++++++++++++
.../core/controller/test/ActionsApiTests.scala | 6 +-
.../core/controller/test/AuthenticateTests.scala | 4 +-
.../controller/test/ControllerTestCommon.scala | 8 +-
.../MultipleReadersSingleWriterCacheTests.scala | 195 +++------------------
.../whisk/core/entity/test/DatastoreTests.scala | 3 +
.../whisk/core/entity/test/MigrationEntities.scala | 2 -
28 files changed, 477 insertions(+), 241 deletions(-)
diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml
index fb0e2b7..615c496 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -51,10 +51,13 @@
delay: 5
- name: create the health topic
- shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic health --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ kafka.topics.health.retentionBytes }} --config retention.ms={{ kafka.topics.health.retentionMS }} --config segment.bytes={{ kafka.topics.health.segmentBytes }}'"
+ shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item }} --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ kafka.topics.health.retentionBytes }} --config retention.ms={{ kafka.topics.health.retentionMS }} --config segment.bytes={{ kafka.topics.health.segmentBytes }}'"
register: command_result
failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)"
changed_when: "'Created topic' in command_result.stdout"
+ with_items:
+ - health
+ - cacheInvalidation
- name: create the active-ack topics
shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic completed{{ item.0 }} --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ kafka.topics.completed.retentionBytes }} --config retention.ms={{ kafka.topics.completed.retentionMS }} --config segment.bytes={{ kafka.topics.completed.segmentBytes }}'"
diff --git a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
index fd3f41b..451bae9 100644
--- a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala
@@ -30,6 +30,7 @@ import akka.stream.IOResult
import akka.stream.scaladsl.StreamConverters
import spray.json.JsObject
import whisk.common.TransactionId
+import whisk.core.entity.CacheKey
import whisk.core.entity.DocId
import whisk.core.entity.DocInfo
import whisk.core.entity.DocRevision
@@ -129,10 +130,11 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
* @param db the datastore client to fetch entity from
* @param doc the entity to store
* @param transid the transaction id for logging
+ * @param notifier an optional callback when cache changes
* @return Future[DocInfo] with completion to DocInfo containing the save document id and revision
*/
def put[Wsuper >: W](db: ArtifactStore[Wsuper], doc: W)(
- implicit transid: TransactionId): Future[DocInfo] = {
+ implicit transid: TransactionId, notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
Try {
require(db != null, "db undefined")
require(doc != null, "doc undefined")
@@ -140,7 +142,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
implicit val logger = db.logging
implicit val ec = db.executionContext
- val key = cacheKeyForUpdate(doc)
+ val key = CacheKey(doc)
cacheUpdate(doc, key, db.put(doc) map { docinfo =>
doc match {
@@ -157,7 +159,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
}
def attach[Wsuper >: W](db: ArtifactStore[Wsuper], doc: DocInfo, attachmentName: String, contentType: ContentType, bytes: InputStream)(
- implicit transid: TransactionId): Future[DocInfo] = {
+ implicit transid: TransactionId, notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
Try {
require(db != null, "db undefined")
@@ -166,7 +168,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
implicit val logger = db.logging
implicit val ec = db.executionContext
- val key = doc.id.asDocInfo
+ val key = CacheKey(doc.id.asDocInfo)
// invalidate the key because attachments update the revision;
// do not cache the new attachment (controller does not need it)
cacheInvalidate(key, {
@@ -180,7 +182,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
}
def del[Wsuper >: W](db: ArtifactStore[Wsuper], doc: DocInfo)(
- implicit transid: TransactionId): Future[Boolean] = {
+ implicit transid: TransactionId, notifier: Option[CacheChangeNotification]): Future[Boolean] = {
Try {
require(db != null, "db undefined")
require(doc != null, "doc undefined")
@@ -188,7 +190,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
implicit val logger = db.logging
implicit val ec = db.executionContext
- val key = doc.id.asDocInfo
+ val key = CacheKey(doc.id.asDocInfo)
cacheInvalidate(key, db.del(doc))
} match {
case Success(f) => f
@@ -221,7 +223,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] {
implicit val logger = db.logging
implicit val ec = db.executionContext
val key = doc.asDocInfo(rev)
- _ => cacheLookup(key, db.get[W](key), fromCache)
+ _ => cacheLookup(CacheKey(key), db.get[W](key), fromCache)
} match {
case Success(f) => f
case Failure(t) => Future.failed(t)
diff --git a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
index d824948..f767c60 100644
--- a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
+++ b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
@@ -22,22 +22,24 @@
package whisk.core.database
-import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
+import scala.language.implicitConversions
import scala.util.Failure
import scala.util.Success
-import scala.language.implicitConversions
+import scala.util.control.NonFatal
+
+import com.github.benmanes.caffeine.cache.Caffeine
import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionId
-import com.github.benmanes.caffeine.cache.Caffeine
-import scala.util.control.NonFatal
+import whisk.core.entity.CacheKey
/**
* A cache that allows multiple readers, but only a single writer, at
@@ -89,6 +91,8 @@ private object MultipleReadersSingleWriterCache {
case class StaleRead(actualState: State) extends Exception(s"Attempted read of invalid entry due to $actualState.")
}
+trait CacheChangeNotification extends (CacheKey => Future[Unit])
+
trait MultipleReadersSingleWriterCache[W, Winfo] {
import MultipleReadersSingleWriterCache._
import MultipleReadersSingleWriterCache.State._
@@ -96,9 +100,6 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
/** Subclasses: Toggle this to enable/disable caching for your entity type. */
protected val cacheEnabled = true
- /** Subclasses: tell me what key to use for updates. */
- protected def cacheKeyForUpdate(w: W): Any
-
private object Entry {
def apply(transid: TransactionId, state: State, value: Option[Future[W]]): Entry = {
new Entry(transid, new AtomicReference(state), value)
@@ -155,11 +156,13 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
* This method posts a delete to the backing store, and either directly invalidates the cache entry
* or informs any outstanding transaction that it must invalidate the cache on completion.
*/
- protected def cacheInvalidate[R](key: Any, invalidator: => Future[R])(
- implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Future[R] = {
+ protected def cacheInvalidate[R](key: CacheKey, invalidator: => Future[R])(
+ implicit ec: ExecutionContext, transid: TransactionId, logger: Logging, notifier: Option[CacheChangeNotification]): Future[R] = {
if (cacheEnabled) {
logger.info(this, s"invalidating $key on delete")
+ notifier.foreach(_(key))
+
// try inserting our desired entry...
val desiredEntry = Entry(transid, InvalidateInProgress, None)
cache(key)(desiredEntry) flatMap { actualEntry =>
@@ -210,7 +213,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
/**
* This method may initiate a read from the backing store, and potentially stores the result in the cache.
*/
- protected def cacheLookup[Wsuper >: W](key: Any, generator: => Future[W], fromCache: Boolean = cacheEnabled)(
+ protected def cacheLookup[Wsuper >: W](key: CacheKey, generator: => Future[W], fromCache: Boolean = cacheEnabled)(
implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Future[W] = {
if (fromCache) {
val promise = Promise[W] // this promise completes with the generator value
@@ -253,9 +256,12 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
/**
* This method posts an update to the backing store, and potentially stores the result in the cache.
*/
- protected def cacheUpdate(doc: W, key: Any, generator: => Future[Winfo])(
- implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Future[Winfo] = {
+ protected def cacheUpdate(doc: W, key: CacheKey, generator: => Future[Winfo])(
+ implicit ec: ExecutionContext, transid: TransactionId, logger: Logging, notifier: Option[CacheChangeNotification]): Future[Winfo] = {
if (cacheEnabled) {
+
+ notifier.foreach(_(key))
+
// try inserting our desired entry...
val desiredEntry = Entry(transid, WriteInProgress, Some(Future.successful(doc)))
cache(key)(desiredEntry) flatMap { actualEntry =>
@@ -288,10 +294,16 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
def cacheSize: Int = cache.size
/**
+ * This method removes an entry from the cache immediately. You can use this method
+ * if you do not need to perform any updates on the backing store but only to the cache.
+ */
+ protected[database] def removeId(key: CacheKey)(implicit ec: ExecutionContext): Unit = cache.remove(key)
+
+ /**
* Log a cache hit
*
*/
- private def makeNoteOfCacheHit(key: Any)(implicit transid: TransactionId, logger: Logging) = {
+ private def makeNoteOfCacheHit(key: CacheKey)(implicit transid: TransactionId, logger: Logging) = {
transid.mark(this, LoggingMarkers.DATABASE_CACHE_HIT, s"[GET] serving from cache: $key")(logger)
}
@@ -299,7 +311,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
* Log a cache miss
*
*/
- private def makeNoteOfCacheMiss(key: Any)(implicit transid: TransactionId, logger: Logging) = {
+ private def makeNoteOfCacheMiss(key: CacheKey)(implicit transid: TransactionId, logger: Logging) = {
transid.mark(this, LoggingMarkers.DATABASE_CACHE_MISS, s"[GET] serving from datastore: $key")(logger)
}
@@ -308,7 +320,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
* 1. either cache the result if there is no intervening delete or update, or
* 2. invalidate the cache because there was an intervening delete or update.
*/
- private def listenForReadDone(key: Any, entry: Entry, generator: => Future[W], promise: Promise[W])(
+ private def listenForReadDone(key: CacheKey, entry: Entry, generator: => Future[W], promise: Promise[W])(
implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Unit = {
generator onComplete {
@@ -362,7 +374,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
* 1. either cache the result if there is no intervening delete or update, or
* 2. invalidate the cache cache because there was an intervening delete or update
*/
- private def listenForWriteDone(key: Any, entry: Entry, generator: => Future[Winfo])(
+ private def listenForWriteDone(key: CacheKey, entry: Entry, generator: => Future[Winfo])(
implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Future[Winfo] = {
generator andThen {
@@ -398,7 +410,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
}
/** Immediately invalidates the given entry. */
- private def invalidateEntry(key: Any, entry: Entry)(
+ private def invalidateEntry(key: CacheKey, entry: Entry)(
implicit transid: TransactionId, logger: Logging): Unit = {
logger.info(this, s"invalidating $key")
entry.invalidate()
@@ -406,7 +418,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
}
/** Invalidates the given entry after a given invalidator completes. */
- private def invalidateEntryAfter[R](invalidator: => Future[R], key: Any, entry: Entry)(
+ private def invalidateEntryAfter[R](invalidator: => Future[R], key: CacheKey, entry: Entry)(
implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Future[R] = {
entry.grabInvalidationLock()
diff --git a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
new file mode 100644
index 0000000..902bf7d
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import java.nio.charset.StandardCharsets
+
+import scala.concurrent.Future
+import scala.concurrent.duration.DurationInt
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Try
+
+import akka.actor.ActorSystem
+import akka.actor.Props
+import spray.json._
+import whisk.common.Logging
+import whisk.core.WhiskConfig
+import whisk.core.connector.Message
+import whisk.core.connector.MessageFeed
+import whisk.core.connector.MessagingProvider
+import whisk.core.entity.CacheKey
+import whisk.core.entity.InstanceId
+import whisk.core.entity.WhiskAction
+import whisk.core.entity.WhiskPackage
+import whisk.core.entity.WhiskRule
+import whisk.core.entity.WhiskTrigger
+import whisk.spi.SpiLoader
+
+case class CacheInvalidationMessage(key: CacheKey, instanceId: String) extends Message {
+ override def serialize = CacheInvalidationMessage.serdes.write(this).compactPrint
+}
+
+object CacheInvalidationMessage extends DefaultJsonProtocol {
+ def parse(msg: String) = Try(serdes.read(msg.parseJson))
+ implicit val serdes = jsonFormat(CacheInvalidationMessage.apply _, "key", "instanceId")
+}
+
+class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance: InstanceId)(implicit logging: Logging, as: ActorSystem) {
+
+ implicit private val ec = as.dispatcher
+
+ private val topic = "cacheInvalidation"
+ private val instanceId = s"$component${instance.toInt}"
+
+ private val msgProvider = SpiLoader.get[MessagingProvider]()
+ private val cacheInvalidationConsumer = msgProvider.getConsumer(config, s"$topic$instanceId", topic, maxPeek = 128)
+ private val cacheInvalidationProducer = msgProvider.getProducer(config, ec)
+
+ def notifyOtherInstancesAboutInvalidation(key: CacheKey): Future[Unit] = {
+ cacheInvalidationProducer.send(topic, CacheInvalidationMessage(key, instanceId)).map(_ => Unit)
+ }
+
+ private val invalidationFeed = as.actorOf(Props {
+ new MessageFeed("cacheInvalidation", logging, cacheInvalidationConsumer, cacheInvalidationConsumer.maxPeek, 1.second, removeFromLocalCache)
+ })
+
+ private def removeFromLocalCache(bytes: Array[Byte]): Future[Unit] = Future {
+ val raw = new String(bytes, StandardCharsets.UTF_8)
+
+ CacheInvalidationMessage.parse(raw) match {
+ case Success(msg: CacheInvalidationMessage) => {
+ if (msg.instanceId != instanceId) {
+ WhiskAction.removeId(msg.key)
+ WhiskPackage.removeId(msg.key)
+ WhiskRule.removeId(msg.key)
+ WhiskTrigger.removeId(msg.key)
+ }
+ }
+ case Failure(t) => logging.error(this, s"failed processing message: $raw with $t")
+ }
+ invalidationFeed ! MessageFeed.Processed
+ }
+}
diff --git a/common/scala/src/main/scala/whisk/core/entity/CacheKey.scala b/common/scala/src/main/scala/whisk/core/entity/CacheKey.scala
new file mode 100644
index 0000000..de0f6fe
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/core/entity/CacheKey.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.entity
+
+import spray.json.DefaultJsonProtocol
+
+class UnsupportedCacheKeyTypeException(msg: String) extends Exception(msg)
+
+/**
+ * A key that is used to store an entity on the cache.
+ *
+ * @param mainId The main part for the key to be used. For example this is the id of a document.
+ * @param secondaryId A second part of the key. For example the revision of an entity. This part
+ * of the key will not be written to the logs.
+ */
+case class CacheKey(mainId: String, secondaryId: Option[String]) {
+ override def toString() = {
+ s"CacheKey($mainId)"
+ }
+}
+
+object CacheKey extends DefaultJsonProtocol {
+ implicit val serdes = jsonFormat2(CacheKey.apply)
+
+ def apply(key: Any): CacheKey = {
+ key match {
+ case e: EntityName => CacheKey(e.asString, None)
+ case a: AuthKey => CacheKey(a.uuid.asString, Some(a.key.asString))
+ case d: DocInfo => {
+ val revision = if (d.rev.empty) None else Some(d.rev.asString)
+ CacheKey(d.id.asString, revision)
+ }
+ case w: WhiskEntity => CacheKey(w.docid.asDocInfo)
+ case s: String => CacheKey(s, None)
+ case others => {
+ throw new UnsupportedCacheKeyTypeException(s"Unable to apply the entity ${others.getClass} on CacheKey.")
+ }
+ }
+ }
+}
diff --git a/common/scala/src/main/scala/whisk/core/entity/Identity.scala b/common/scala/src/main/scala/whisk/core/entity/Identity.scala
index d272f7a..32093c8 100644
--- a/common/scala/src/main/scala/whisk/core/entity/Identity.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/Identity.scala
@@ -45,7 +45,6 @@ object Identity extends MultipleReadersSingleWriterCache[Identity, DocInfo] with
private val viewName = "subjects/identities"
override val cacheEnabled = true
- override def cacheKeyForUpdate(i: Identity) = i.authkey
implicit val serdes = jsonFormat5(Identity.apply)
/**
@@ -58,8 +57,9 @@ object Identity extends MultipleReadersSingleWriterCache[Identity, DocInfo] with
implicit val logger: Logging = datastore.logging
implicit val ec = datastore.executionContext
val ns = namespace.asString
+ val key = CacheKey(namespace)
- cacheLookup(ns, {
+ cacheLookup(key, {
list(datastore, List(ns), limit = 1) map { list =>
list.length match {
case 1 =>
@@ -80,7 +80,7 @@ object Identity extends MultipleReadersSingleWriterCache[Identity, DocInfo] with
implicit val logger: Logging = datastore.logging
implicit val ec = datastore.executionContext
- cacheLookup(authkey, {
+ cacheLookup(CacheKey(authkey), {
list(datastore, List(authkey.uuid.asString, authkey.key.asString)) map { list =>
list.length match {
case 1 =>
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
index c4e90f4..f788ac4 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
@@ -30,6 +30,7 @@ import spray.json.DefaultJsonProtocol._
import whisk.common.TransactionId
import whisk.core.database.ArtifactStore
import whisk.core.database.DocumentFactory
+import whisk.core.database.CacheChangeNotification
import whisk.core.entity.Attachments._
import whisk.core.entity.types.EntityStore
@@ -215,11 +216,10 @@ object WhiskAction
override implicit val serdes = jsonFormat(WhiskAction.apply, "namespace", "name", "exec", "parameters", "limits", "version", "publish", "annotations")
override val cacheEnabled = true
- override def cacheKeyForUpdate(w: WhiskAction) = w.docid.asDocInfo
// overriden to store attached code
override def put[A >: WhiskAction](db: ArtifactStore[A], doc: WhiskAction)(
- implicit transid: TransactionId): Future[DocInfo] = {
+ implicit transid: TransactionId, notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
Try {
require(db != null, "db undefined")
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
index b4f2f89..0fe0dcd 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
@@ -117,5 +117,4 @@ object WhiskActivation
// Caching activations doesn't make much sense in the common case as usually,
// an activation is only asked for once.
override val cacheEnabled = false
- override def cacheKeyForUpdate(w: WhiskActivation) = w.docid.asDocInfo
}
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskPackage.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskPackage.scala
index e29f03f..ac73147 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskPackage.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskPackage.scala
@@ -198,7 +198,6 @@ object WhiskPackage
}
override val cacheEnabled = true
- override def cacheKeyForUpdate(w: WhiskPackage) = w.docid.asDocInfo
}
/**
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskRule.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskRule.scala
index c91cc9c..749b6f5 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskRule.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskRule.scala
@@ -233,7 +233,6 @@ object WhiskRule
}
override val cacheEnabled = false
- override def cacheKeyForUpdate(w: WhiskRule) = w.docid.asDocInfo
}
object WhiskRuleResponse extends DefaultJsonProtocol {
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskTrigger.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskTrigger.scala
index 128a7e5..d2271c4 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskTrigger.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskTrigger.scala
@@ -119,7 +119,6 @@ object WhiskTrigger
override implicit val serdes = jsonFormat8(WhiskTrigger.apply)
override val cacheEnabled = true
- override def cacheKeyForUpdate(w: WhiskTrigger) = w.docid.asDocInfo
}
object WhiskTriggerPut extends DefaultJsonProtocol {
diff --git a/core/controller/src/main/scala/whisk/core/controller/Actions.scala b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
index ba74de8..d4a59cc 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Actions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
@@ -39,6 +39,7 @@ import spray.json.DefaultJsonProtocol._
import whisk.common.TransactionId
import whisk.core.WhiskConfig
import whisk.core.controller.actions.PostActionActivation
+import whisk.core.database.CacheChangeNotification
import whisk.core.database.NoDocumentException
import whisk.core.entitlement._
import whisk.core.entity._
@@ -84,6 +85,9 @@ trait WhiskActionsApi
/** Database service to CRUD actions. */
protected val entityStore: EntityStore
+ /** Notification service for cache invalidation. */
+ protected implicit val cacheChangeNotification: Some[CacheChangeNotification]
+
/** Database service to get activations. */
protected val activationStore: ActivationStore
diff --git a/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala b/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
index a52bd24..e6f3c51 100644
--- a/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala
@@ -22,22 +22,20 @@ import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.StatusCode
import akka.http.scaladsl.model.StatusCodes.Conflict
import akka.http.scaladsl.model.StatusCodes.InternalServerError
import akka.http.scaladsl.model.StatusCodes.NotFound
import akka.http.scaladsl.model.StatusCodes.OK
-import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.server.RequestContext
import akka.http.scaladsl.server.RouteResult
-
import spray.json.DefaultJsonProtocol._
import spray.json.JsBoolean
import spray.json.JsObject
import spray.json.JsValue
import spray.json.RootJsonFormat
-
import whisk.common.Logging
import whisk.common.TransactionId
import whisk.core.controller.PostProcess.PostProcessEntity
@@ -46,6 +44,7 @@ import whisk.core.database.ArtifactStoreException
import whisk.core.database.DocumentConflictException
import whisk.core.database.DocumentFactory
import whisk.core.database.DocumentTypeMismatchException
+import whisk.core.database.CacheChangeNotification
import whisk.core.database.NoDocumentException
import whisk.core.entity.DocId
import whisk.core.entity.WhiskDocument
@@ -276,6 +275,7 @@ trait WriteOps extends Directives {
postProcess: Option[PostProcessEntity[A]] = None)(
implicit transid: TransactionId,
format: RootJsonFormat[A],
+ notifier: Option[CacheChangeNotification],
ma: Manifest[A]) = {
// marker to return an existing doc with status OK rather than conflict if overwrite is false
case class IdentityPut(self: A) extends Throwable
@@ -347,6 +347,7 @@ trait WriteOps extends Directives {
postProcess: Option[PostProcessEntity[A]] = None)(
implicit transid: TransactionId,
format: RootJsonFormat[A],
+ notifier: Option[CacheChangeNotification],
ma: Manifest[A]) = {
onComplete(factory.get(datastore, docid) flatMap {
entity =>
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 90d4023..4037e0d 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -29,11 +29,14 @@ import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import spray.json._
import spray.json.DefaultJsonProtocol._
+
import whisk.common.AkkaLogging
import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionId
import whisk.core.WhiskConfig
+import whisk.core.database.RemoteCacheInvalidation
+import whisk.core.database.CacheChangeNotification
import whisk.core.entitlement._
import whisk.core.entity._
import whisk.core.entity.ActivationId.ActivationIdGenerator
@@ -96,6 +99,10 @@ class Controller(
private implicit val authStore = WhiskAuthStore.datastore(whiskConfig)
private implicit val entityStore = WhiskEntityStore.datastore(whiskConfig)
private implicit val activationStore = WhiskActivationStore.datastore(whiskConfig)
+ private implicit val cacheChangeNotification = Some(new CacheChangeNotification {
+ val remoteCacheInvalidaton = new RemoteCacheInvalidation(whiskConfig, "controller", instance)
+ override def apply(k: CacheKey) = remoteCacheInvalidaton.notifyOtherInstancesAboutInvalidation(k)
+ })
// initialize backend services
private implicit val loadBalancer = new LoadBalancerService(whiskConfig, instance, entityStore)
diff --git a/core/controller/src/main/scala/whisk/core/controller/Packages.scala b/core/controller/src/main/scala/whisk/core/controller/Packages.scala
index 925d5c1..cca510e 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Packages.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Packages.scala
@@ -31,6 +31,7 @@ import spray.json._
import whisk.common.TransactionId
import whisk.core.database.DocumentTypeMismatchException
+import whisk.core.database.CacheChangeNotification
import whisk.core.database.NoDocumentException
import whisk.core.entitlement._
import whisk.core.entity._
@@ -46,6 +47,9 @@ trait WhiskPackagesApi extends WhiskCollectionAPI with ReferencedEntities {
/** Database service to CRUD packages. */
protected val entityStore: EntityStore
+ /** Notification service for cache invalidation. */
+ protected implicit val cacheChangeNotification: Some[CacheChangeNotification]
+
/** Route directives for API. The methods that are supported on packages. */
protected override lazy val entityOps = put | get | delete
diff --git a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
index 0e374f6..f75750e 100644
--- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
@@ -17,31 +17,31 @@
package whisk.core.controller
+import scala.concurrent.ExecutionContext
+
import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.stream.ActorMaterializer
import spray.json._
import spray.json.DefaultJsonProtocol._
-
-import scala.concurrent.ExecutionContext
-
+import whisk.common.Logging
import whisk.common.TransactionId
+import whisk.core.database.CacheChangeNotification
import whisk.core.WhiskConfig
import whisk.core.WhiskConfig.whiskVersionBuildno
import whisk.core.WhiskConfig.whiskVersionDate
-import whisk.core.entity.WhiskAuthStore
-import whisk.common.Logging
-import whisk.common.TransactionId
-import whisk.core.entity._
-import whisk.core.entity.types._
import whisk.core.entitlement._
+import whisk.core.entity._
import whisk.core.entity.ActivationId.ActivationIdGenerator
+import whisk.core.entity.WhiskAuthStore
+import whisk.core.entity.types._
import whisk.core.loadBalancer.LoadBalancerService
/**
@@ -89,10 +89,10 @@ protected[controller] object RestApiCommons {
Authenticate.requiredProperties ++
Collection.requiredProperties
+ import akka.http.scaladsl.model.HttpCharsets
+ import akka.http.scaladsl.model.MediaTypes.`application/json`
import akka.http.scaladsl.unmarshalling.FromEntityUnmarshaller
import akka.http.scaladsl.unmarshalling.Unmarshaller
- import akka.http.scaladsl.model.MediaTypes.`application/json`
- import akka.http.scaladsl.model.HttpCharsets
/**
* Extract an empty entity into a JSON object. This is useful for the
@@ -140,6 +140,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
implicit val entitlementProvider: EntitlementProvider,
implicit val activationIdFactory: ActivationIdGenerator,
implicit val loadBalancer: LoadBalancerService,
+ implicit val cacheChangeNotification: Some[CacheChangeNotification],
implicit val activationStore: ActivationStore,
implicit val whiskConfig: WhiskConfig)
extends SwaggerDocs(Uri.Path(apiPath) / apiVersion, "apiv1swagger.json")
@@ -208,12 +209,12 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
private val web = new WebActionsApi(Seq("web"), this.WebApiDirectives)
class NamespacesApi(
- val apiPath: String,
- val apiVersion: String)(
- implicit override val entityStore: EntityStore,
- override val entitlementProvider: EntitlementProvider,
- override val executionContext: ExecutionContext,
- override val logging: Logging)
+ val apiPath: String,
+ val apiVersion: String)(
+ implicit override val entityStore: EntityStore,
+ override val entitlementProvider: EntitlementProvider,
+ override val executionContext: ExecutionContext,
+ override val logging: Logging)
extends WhiskNamespacesApi
class ActionsApi(
@@ -226,6 +227,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
override val entitlementProvider: EntitlementProvider,
override val activationIdFactory: ActivationIdGenerator,
override val loadBalancer: LoadBalancerService,
+ override val cacheChangeNotification: Some[CacheChangeNotification],
override val executionContext: ExecutionContext,
override val logging: Logging,
override val whiskConfig: WhiskConfig)
@@ -250,6 +252,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
override val entitlementProvider: EntitlementProvider,
override val activationIdFactory: ActivationIdGenerator,
override val loadBalancer: LoadBalancerService,
+ override val cacheChangeNotification: Some[CacheChangeNotification],
override val executionContext: ExecutionContext,
override val logging: Logging,
override val whiskConfig: WhiskConfig)
@@ -263,6 +266,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
override val entitlementProvider: EntitlementProvider,
override val activationIdFactory: ActivationIdGenerator,
override val loadBalancer: LoadBalancerService,
+ override val cacheChangeNotification: Some[CacheChangeNotification],
override val executionContext: ExecutionContext,
override val logging: Logging,
override val whiskConfig: WhiskConfig)
@@ -277,6 +281,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
override val activationStore: ActivationStore,
override val activationIdFactory: ActivationIdGenerator,
override val loadBalancer: LoadBalancerService,
+ override val cacheChangeNotification: Some[CacheChangeNotification],
override val executionContext: ExecutionContext,
override val logging: Logging,
override val whiskConfig: WhiskConfig,
diff --git a/core/controller/src/main/scala/whisk/core/controller/Rules.scala b/core/controller/src/main/scala/whisk/core/controller/Rules.scala
index e65c845..6f657b2 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Rules.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Rules.scala
@@ -30,6 +30,7 @@ import spray.json.DeserializationException
import whisk.common.TransactionId
import whisk.core.database.DocumentConflictException
+import whisk.core.database.CacheChangeNotification
import whisk.core.database.NoDocumentException
import whisk.core.entity._
import whisk.core.entity.types.EntityStore
@@ -54,6 +55,9 @@ trait WhiskRulesApi extends WhiskCollectionAPI with ReferencedEntities {
/** JSON response formatter. */
import RestApiCommons.jsonDefaultResponsePrinter
+ /** Notification service for cache invalidation. */
+ protected implicit val cacheChangeNotification: Some[CacheChangeNotification]
+
/** Path to Rules REST API. */
protected val rulesPath = "rules"
diff --git a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
index d77d981..c9d1444 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala
@@ -45,6 +45,7 @@ import spray.json._
import spray.json.DefaultJsonProtocol.RootJsObjectFormat
import whisk.common.TransactionId
+import whisk.core.database.CacheChangeNotification
import whisk.core.entitlement.Collection
import whisk.core.entity.ActivationResponse
import whisk.core.entity.EntityPath
@@ -57,9 +58,9 @@ import whisk.core.entity.WhiskTrigger
import whisk.core.entity.WhiskTriggerPut
import whisk.core.entity.types.ActivationStore
import whisk.core.entity.types.EntityStore
-import whisk.http.ErrorResponse.terminate
import whisk.core.entity.Identity
import whisk.core.entity.FullyQualifiedEntityName
+import whisk.http.ErrorResponse.terminate
/** A trait implementing the triggers API. */
trait WhiskTriggersApi extends WhiskCollectionAPI {
@@ -73,6 +74,9 @@ trait WhiskTriggersApi extends WhiskCollectionAPI {
/** Database service to CRUD triggers. */
protected val entityStore: EntityStore
+ /** Notification service for cache invalidation. */
+ protected implicit val cacheChangeNotification: Some[CacheChangeNotification]
+
/** Database service to get activations. */
protected val activationStore: ActivationStore
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
index 55bfac4..e60e1dd 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
@@ -160,7 +160,7 @@ protected[actions] trait SequenceActions {
*/
private def storeSequenceActivation(activation: WhiskActivation)(implicit transid: TransactionId): Unit = {
logging.info(this, s"recording activation '${activation.activationId}'")
- WhiskActivation.put(activationStore, activation) onComplete {
+ WhiskActivation.put(activationStore, activation)(transid, notifier = None) onComplete {
case Success(id) => logging.info(this, s"recorded activation")
case Failure(t) => logging.error(this, s"failed to record activation ${activation.activationId} with error ${t.getLocalizedMessage}")
}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index b4c79ee..07ca455 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -18,6 +18,8 @@
package whisk.core.loadBalancer
import java.nio.charset.StandardCharsets
+
+import scala.annotation.tailrec
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
@@ -25,31 +27,33 @@ import scala.concurrent.Promise
import scala.concurrent.duration.DurationInt
import scala.util.Failure
import scala.util.Success
+
import org.apache.kafka.clients.producer.RecordMetadata
+
import akka.actor.ActorRefFactory
import akka.actor.ActorSystem
import akka.actor.Props
-import akka.pattern.ask
import akka.util.Timeout
+import akka.pattern.ask
+
import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionId
import whisk.core.WhiskConfig
import whisk.core.WhiskConfig._
-import whisk.core.connector.MessagingProvider
import whisk.core.connector.{ ActivationMessage, CompletionMessage }
import whisk.core.connector.MessageFeed
import whisk.core.connector.MessageProducer
+import whisk.core.connector.MessagingProvider
import whisk.core.database.NoDocumentException
import whisk.core.entity.{ ActivationId, WhiskActivation }
-import whisk.core.entity.InstanceId
+import whisk.core.entity.EntityName
import whisk.core.entity.ExecutableWhiskAction
+import whisk.core.entity.Identity
+import whisk.core.entity.InstanceId
import whisk.core.entity.UUID
import whisk.core.entity.WhiskAction
import whisk.core.entity.types.EntityStore
-import scala.annotation.tailrec
-import whisk.core.entity.EntityName
-import whisk.core.entity.Identity
import whisk.spi.SpiLoader
trait LoadBalancer {
@@ -162,9 +166,9 @@ class LoadBalancerService(
private def createTestActionForInvokerHealth(db: EntityStore, action: WhiskAction): Future[Unit] = {
implicit val tid = TransactionId.loadbalancer
WhiskAction.get(db, action.docid).flatMap { oldAction =>
- WhiskAction.put(db, action.revision(oldAction.rev))
+ WhiskAction.put(db, action.revision(oldAction.rev))(tid, notifier = None)
}.recover {
- case _: NoDocumentException => WhiskAction.put(db, action)
+ case _: NoDocumentException => WhiskAction.put(db, action)(tid, notifier = None)
}.map(_ => {}).andThen {
case Success(_) => logging.info(this, "test action for invoker health now exists")
case Failure(e) => logging.error(this, s"error creating test action for invoker health: $e")
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 1f64625..0f8a2cf 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -140,7 +140,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
val store = (tid: TransactionId, activation: WhiskActivation) => {
implicit val transid = tid
logging.info(this, "recording the activation result to the data store")
- WhiskActivation.put(activationStore, activation).andThen {
+ WhiskActivation.put(activationStore, activation)(tid, notifier = None).andThen {
case Success(id) => logging.info(this, s"recorded activation")
case Failure(t) => logging.error(this, s"failed to record activation")
}
diff --git a/tests/src/test/scala/ha/CacheInvalidationTests.scala b/tests/src/test/scala/ha/CacheInvalidationTests.scala
new file mode 100644
index 0000000..ecda623
--- /dev/null
+++ b/tests/src/test/scala/ha/CacheInvalidationTests.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ha
+
+import scala.concurrent.Await
+import scala.concurrent.duration.DurationInt
+
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.marshalling.Marshal
+import akka.http.scaladsl.model.HttpMethods
+import akka.http.scaladsl.model.HttpRequest
+import akka.http.scaladsl.model.RequestEntity
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.model.headers.Authorization
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.unmarshalling.Unmarshal
+import akka.stream.ActorMaterializer
+import common.WhiskProperties
+import common.WskActorSystem
+import common.WskTestHelpers
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import whisk.core.WhiskConfig
+import akka.http.scaladsl.model.StatusCode
+
+@RunWith(classOf[JUnitRunner])
+class CacheInvalidationTests
+ extends FlatSpec
+ with Matchers
+ with WskTestHelpers
+ with WskActorSystem {
+
+ implicit val materializer = ActorMaterializer()
+
+ val hosts = WhiskProperties.getProperty("controller.hosts").split(",")
+ val authKey = WhiskProperties.readAuthKey(WhiskProperties.getAuthFileForTesting)
+
+ val timeout = 15.seconds
+
+ def retry[T](fn: => T) = whisk.utils.retry(fn, 15, Some(1.second))
+
+ def updateAction(name: String, code: String, controllerInstance: Int = 0) = {
+ require(controllerInstance >= 0 && controllerInstance < hosts.length, "Controller instance not known.")
+
+ val host = hosts(controllerInstance)
+ val port = WhiskProperties.getControllerBasePort + controllerInstance
+
+ val body = JsObject("namespace" -> JsString("_"), "name" -> JsString(name), "exec" -> JsObject("kind" -> JsString("nodejs:default"), "code" -> JsString(code)))
+
+ val request = Marshal(body).to[RequestEntity].flatMap { entity =>
+ Http().singleRequest(HttpRequest(
+ method = HttpMethods.PUT,
+ uri = Uri().withScheme("http").withHost(host).withPort(port).withPath(Uri.Path(s"/api/v1/namespaces/_/actions/$name")).withQuery(Uri.Query("overwrite" -> true.toString)),
+ headers = List(Authorization(BasicHttpCredentials(authKey.split(":")(0), authKey.split(":")(1)))),
+ entity = entity)).flatMap { response =>
+ val action = Unmarshal(response).to[JsObject].map { resBody =>
+ withClue(s"Error in Body: $resBody")(response.status shouldBe StatusCodes.OK)
+ resBody
+ }
+ action
+ }
+ }
+
+ Await.result(request, timeout)
+ }
+
+ def getAction(name: String, controllerInstance: Int = 0, expectedCode: StatusCode = StatusCodes.OK) = {
+ require(controllerInstance >= 0 && controllerInstance < hosts.length, "Controller instance not known.")
+
+ val host = hosts(controllerInstance)
+ val port = WhiskProperties.getControllerBasePort + controllerInstance
+
+ val request = Http().singleRequest(HttpRequest(
+ method = HttpMethods.GET,
+ uri = Uri().withScheme("http").withHost(host).withPort(port).withPath(Uri.Path(s"/api/v1/namespaces/_/actions/$name")),
+ headers = List(Authorization(BasicHttpCredentials(authKey.split(":")(0), authKey.split(":")(1)))))).flatMap { response =>
+ val action = Unmarshal(response).to[JsObject].map { resBody =>
+ withClue(s"Wrong statuscode from controller. Body is: $resBody")(response.status shouldBe expectedCode)
+ resBody
+ }
+ action
+ }
+
+ Await.result(request, timeout)
+ }
+
+ def deleteAction(name: String, controllerInstance: Int = 0, expectedCode: Option[StatusCode] = Some(StatusCodes.OK)) = {
+ require(controllerInstance >= 0 && controllerInstance < hosts.length, "Controller instance not known.")
+
+ val host = hosts(controllerInstance)
+ val port = WhiskProperties.getControllerBasePort + controllerInstance
+
+ val request = Http().singleRequest(HttpRequest(
+ method = HttpMethods.DELETE,
+ uri = Uri().withScheme("http").withHost(host).withPort(port).withPath(Uri.Path(s"/api/v1/namespaces/_/actions/$name")),
+ headers = List(Authorization(BasicHttpCredentials(authKey.split(":")(0), authKey.split(":")(1)))))).flatMap { response =>
+ val action = Unmarshal(response).to[JsObject].map { resBody =>
+ expectedCode.map { code =>
+ withClue(s"Wrong statuscode from controller. Body is: $resBody")(response.status shouldBe code)
+ }
+ resBody
+ }
+ action
+ }
+
+ Await.result(request, timeout)
+ }
+
+ behavior of "the cache"
+
+ it should "be invalidated on updating an entity" in {
+ if (WhiskProperties.getProperty(WhiskConfig.controllerInstances).toInt >= 2) {
+ val actionName = "invalidateRemoteCacheOnUpdate"
+
+ deleteAction(actionName, 0, None)
+ deleteAction(actionName, 1, None)
+
+ // Create an action on controller0
+ val createdAction = updateAction(actionName, "CODE_CODE_CODE", 0)
+
+ // Get action from controller1
+ val actionFromController1 = getAction(actionName, 1)
+ createdAction shouldBe actionFromController1
+
+ // Update the action on controller0
+ val updatedAction = updateAction(actionName, "CODE_CODE", 0)
+
+ retry({
+ // Get action from controller1
+ val updatedActionFromController1 = getAction(actionName, 1)
+ updatedAction shouldBe updatedActionFromController1
+ })
+ }
+ }
+
+ it should "be invalidated on deleting an entity" in {
+ if (WhiskProperties.getProperty(WhiskConfig.controllerInstances).toInt >= 2) {
+ val actionName = "invalidateRemoteCacheOnDelete"
+
+ deleteAction(actionName, 0, None)
+ deleteAction(actionName, 1, None)
+
+ // Create an action on controller0
+ val createdAction = updateAction(actionName, "CODE_CODE_CODE", 0)
+ // Get action from controller1 (Now its in the cache of controller 0 and 1)
+ val actionFromController1 = getAction(actionName, 1)
+ createdAction shouldBe actionFromController1
+
+ retry({
+ // Delete the action on controller0 (It should be deleted automatically from the cache of controller1)
+ val updatedAction = deleteAction(actionName, 0)
+ // Get action from controller1 should fail with 404
+ getAction(actionName, 1, StatusCodes.NotFound)
+ })
+ }
+ }
+}
diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
index 150a04e..1aae330 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala
@@ -469,7 +469,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
action.parameters, action.limits, action.version,
action.publish, action.annotations ++ Parameters(WhiskAction.execFieldName, NODEJS6)))
}
- stream.toString should include regex (s"caching*.*${action.docid.asDocInfo}")
+ stream.toString should include(s"caching ${CacheKey(action)}")
stream.reset()
// second request should fetch from cache
@@ -481,7 +481,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
action.publish, action.annotations ++ Parameters(WhiskAction.execFieldName, NODEJS6)))
}
- stream.toString should include regex (s"serving from cache:*.*${action.docid.asDocInfo}")
+ stream.toString should include(s"serving from cache: ${CacheKey(action)}")
stream.reset()
// delete should invalidate cache
@@ -492,7 +492,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
action.parameters, action.limits, action.version,
action.publish, action.annotations ++ Parameters(WhiskAction.execFieldName, NODEJS6)))
}
- stream.toString should include regex (s"invalidating*.*${action.docid.asDocInfo}")
+ stream.toString should include(s"invalidating ${CacheKey(action)}")
stream.reset()
}
diff --git a/tests/src/test/scala/whisk/core/controller/test/AuthenticateTests.scala b/tests/src/test/scala/whisk/core/controller/test/AuthenticateTests.scala
index e964856..2c6415b 100644
--- a/tests/src/test/scala/whisk/core/controller/test/AuthenticateTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/AuthenticateTests.scala
@@ -62,14 +62,14 @@ class AuthenticateTests extends ControllerTestCommon with Authenticate {
user.get shouldBe Identity(subject, ns.name, ns.authkey, Privilege.ALL)
// first lookup should have been from datastore
- stream.toString should include regex (s"serving from datastore: ${ns.authkey.uuid.asString}")
+ stream.toString should include(s"serving from datastore: ${CacheKey(ns.authkey)}")
stream.reset()
// repeat query, now should be served from cache
val cachedUser = Await.result(validateCredentials(Some(pass))(transid()), dbOpTimeout)
cachedUser.get shouldBe Identity(subject, ns.name, ns.authkey, Privilege.ALL)
- stream.toString should include regex (s"serving from cache: ${ns.authkey.uuid.asString}")
+ stream.toString should include(s"serving from cache: ${CacheKey(ns.authkey)}")
stream.reset()
}
diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index dbfa17f..f899e81 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -42,6 +42,7 @@ import whisk.core.connector.ActivationMessage
import whisk.core.controller.RestApiCommons
import whisk.core.controller.WhiskServices
import whisk.core.database.DocumentFactory
+import whisk.core.database.CacheChangeNotification
import whisk.core.database.test.DbUtils
import whisk.core.entitlement._
import whisk.core.entity._
@@ -85,6 +86,12 @@ protected trait ControllerTestCommon
override def make = fixedId
}
+ implicit val cacheChangeNotification = Some {
+ new CacheChangeNotification {
+ override def apply(k: CacheKey): Future[Unit] = Future.successful(())
+ }
+ }
+
val entityStore = WhiskEntityStore.datastore(whiskConfig)
val activationStore = WhiskActivationStore.datastore(whiskConfig)
val authStore = WhiskAuthStore.datastore(whiskConfig)
@@ -169,7 +176,6 @@ protected trait ControllerTestCommon
with DefaultJsonProtocol {
implicit val serdes = jsonFormat5(BadEntity.apply)
override val cacheEnabled = true
- override def cacheKeyForUpdate(w: BadEntity) = w.docid.asDocInfo
}
}
diff --git a/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala b/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
index f59c5f1..6a8f272 100644
--- a/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
+++ b/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
@@ -17,197 +17,58 @@
package whisk.core.database.test
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.Executors
-import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
-import scala.language.postfixOps
-import scala.util.Failure
-import scala.util.Success
+import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
import common.StreamLogging
import common.WskActorSystem
-import whisk.common.Logging
import whisk.common.TransactionId
+import whisk.core.database.CacheChangeNotification
import whisk.core.database.MultipleReadersSingleWriterCache
+import whisk.core.entity.CacheKey
-class MultipleReadersSingleWriterCacheTests(nIters: Int = 3) extends FlatSpec
+@RunWith(classOf[JUnitRunner])
+class MultipleReadersSingleWriterCacheTests extends FlatSpec
with Matchers
with MultipleReadersSingleWriterCache[String, String]
with WskActorSystem
with StreamLogging {
- "the cache" should "support simple CRUD" in {
- val inhibits = doReadWriteRead("foo").go(0 seconds)
- inhibits.debug(this)
+ behavior of "the cache"
- inhibits.nReadInhibits.get should be(0)
- cacheSize should be(1)
- }
-
- "the cache" should "support concurrent CRUD to different keys" in {
- //
- // for the first iter, all reads are not-cached and each thread
- // requests a different key, so we expect no read inhibits, and a
- // bunch of write inhibits
- //
- val inhibits = doCRUD("CONCURRENT CRUD to different keys", { i => "foop_" + i })
- inhibits.nReadInhibits.get should be(0)
- inhibits.nWriteInhibits.get should not be (0)
-
- //
- // after the first iter, the keys already exist, so the first read
- // should be cached, resulting in the writes proceeding more
- // smoothly this time, thus inhibiting some of the second reads
- //
- for (i <- 1 to nIters - 1) {
- doCRUD("CONCURRENT CRUD to different keys", { i => "foop_" + i })
- .nReadInhibits.get should not be (0)
- }
- }
-
- "the cache" should "support concurrent CRUD to shared keys" in {
- for (i <- 1 to nIters) {
- doCRUD("CONCURRENT CRUD to shared keys", sharedKeys)
- .nWriteInhibits.get should not be (0)
- }
- }
-
- "the cache" should "support concurrent CRUD to shared keys (zero latency)" in {
- var hasInhibits = false
- for (i <- 1 to nIters) {
- hasInhibits = doCRUD("concurrent CRUD to shared keys (zero latency)", sharedKeys, 0 seconds)
- .hasInhibits
- }
- hasInhibits should not be (false)
- }
-
- "the cache" should "support concurrent CRUD to shared keys (short latency)" in {
- for (i <- 1 to nIters) {
- doCRUD("concurrent CRUD to shared keys (short latency)", sharedKeys, 10 milliseconds)
- .hasInhibits should be(true)
- }
- }
-
- "the cache" should "support concurrent CRUD to shared keys (medium latency)" in {
- for (i <- 1 to nIters) {
- doCRUD("concurrent CRUD to shared keys (medium latency)", sharedKeys, 100 milliseconds)
- .hasInhibits should be(true)
- }
- }
-
- "the cache" should "support concurrent CRUD to shared keys (long latency)" in {
- for (i <- 1 to nIters) {
- doCRUD("CONCURRENT CRUD to shared keys (long latency)", sharedKeys, 5 seconds)
- .nWriteInhibits.get should not be (0)
- }
- }
-
- "the cache" should "support concurrent CRUD to shared keys, with update first" in {
- for (i <- 1 to nIters) {
- doCRUD("CONCURRENT CRUD to shared keys, with update first", sharedKeys, 1 second, false)
- .nWriteInhibits.get should be(0)
- }
- }
-
- def sharedKeys = { i: Int => "foop_" + (i % 2) }
-
- def doCRUD(
- testName: String,
- key: Int => String,
- delay: FiniteDuration = 1 second,
- readsFirst: Boolean = true,
- nThreads: Int = 10): Inhibits = {
-
- System.out.println(testName);
-
- val exec = Executors.newFixedThreadPool(nThreads)
- val inhibits = Inhibits()
+ it should "execute the callback on invalidating and updating an entry" in {
+ val ctr = new AtomicInteger(0)
+ val key = CacheKey("key")
- for (i <- 1 to nThreads) {
- exec.submit(new Runnable { def run() = { doReadWriteRead(key(i), inhibits, readsFirst).go(delay) } })
- }
-
- exec.shutdown
- exec.awaitTermination(2, TimeUnit.MINUTES)
-
- inhibits.debug(this)
- inhibits
- }
-
- case class Inhibits(
- nReadInhibits: AtomicInteger = new AtomicInteger(0),
- nWriteInhibits: AtomicInteger = new AtomicInteger(0)) {
-
- def debug(from: AnyRef) = {
- logging.debug(from, "InhibitedReads: " + nReadInhibits)
- logging.debug(from, "InhibitedWrites: " + nWriteInhibits)
- }
-
- def hasInhibits: Boolean = { nReadInhibits.get > 0 || nWriteInhibits.get > 0 }
- }
-
- private case class doReadWriteRead(key: String, inhibits: Inhibits = Inhibits(), readFirst: Boolean = true)(implicit logging: Logging) {
- def go(implicit delay: FiniteDuration): Inhibits = {
- val latch = new CountDownLatch(2)
-
- implicit val transId = TransactionId.testing
-
- if (!readFirst) {
- // we want to do the update before the first read
- cacheUpdate(key, key, delayed("bar_b")) onFailure {
- case t =>
- inhibits.nWriteInhibits.incrementAndGet();
- }
- }
-
- cacheLookup(key, delayed("bar"), true) onComplete {
- case Success(s) => {
- latch.countDown()
- }
- case Failure(t) => {
- latch.countDown()
- inhibits.nReadInhibits.incrementAndGet();
- }
- }
-
- if (readFirst) {
- // we did the read before the update, so do the write next
- cacheUpdate(key, key, delayed("bar_b")) onFailure {
- case t =>
- inhibits.nWriteInhibits.incrementAndGet();
- }
- }
-
- cacheLookup(key, delayed("bar_c"), true) onComplete {
- case Success(s) => {
- latch.countDown();
- }
- case Failure(t) => {
- inhibits.nReadInhibits.incrementAndGet();
- latch.countDown();
+ implicit val transId = TransactionId.testing
+ lazy implicit val cacheUpdateNotifier = Some {
+ new CacheChangeNotification {
+ override def apply(key: CacheKey) = {
+ ctr.incrementAndGet()
+ Future.successful(())
}
}
+ }
- latch.await(2, TimeUnit.MINUTES)
+ // Create an cache entry
+ cacheUpdate("doc", key, Future.successful("db save successful"))
+ ctr.get shouldBe 1
- inhibits
- }
- }
+ // Callback should be called if entry exists
+ cacheInvalidate(key, Future.successful(()))
+ ctr.get shouldBe 2
+ cacheUpdate("docdoc", key, Future.successful("update in db successful"))
+ ctr.get shouldBe 3
- private def delayed[W](v: W)(implicit delay: FiniteDuration): Future[W] = {
- akka.pattern.after(duration = delay, using = actorSystem.scheduler)(
- Future.successful { v })
+ // Callback should be called if entry does not exist
+ cacheInvalidate(CacheKey("abc"), Future.successful(()))
+ ctr.get shouldBe 4
}
-
- /** we are using cache keys, so the update key is just the string itself */
- override protected def cacheKeyForUpdate(w: String): String = (w)
}
diff --git a/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala b/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala
index 466f326..f881510 100644
--- a/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala
@@ -32,6 +32,7 @@ import common.StreamLogging
import common.WskActorSystem
import whisk.core.WhiskConfig
import whisk.core.database.DocumentConflictException
+import whisk.core.database.CacheChangeNotification
import whisk.core.database.NoDocumentException
import whisk.core.database.test.DbUtils
import whisk.core.entity._
@@ -50,6 +51,8 @@ class DatastoreTests extends FlatSpec
val datastore = WhiskEntityStore.datastore(config)
val authstore = WhiskAuthStore.datastore(config)
+ implicit val cacheUpdateNotifier: Option[CacheChangeNotification] = None
+
override def afterAll() {
println("Shutting down store connections")
datastore.shutdown()
diff --git a/tests/src/test/scala/whisk/core/entity/test/MigrationEntities.scala b/tests/src/test/scala/whisk/core/entity/test/MigrationEntities.scala
index 0c83ca5..a6af6e4 100644
--- a/tests/src/test/scala/whisk/core/entity/test/MigrationEntities.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/MigrationEntities.scala
@@ -58,7 +58,6 @@ object OldWhiskRule
override val collectionName = "rules"
override implicit val serdes = jsonFormat8(OldWhiskRule.apply)
- override def cacheKeyForUpdate(t: OldWhiskRule) = t.docid.asDocInfo
}
/**
@@ -86,5 +85,4 @@ object OldWhiskTrigger
override val collectionName = "triggers"
override implicit val serdes = jsonFormat7(OldWhiskTrigger.apply)
- override def cacheKeyForUpdate(t: OldWhiskTrigger) = t.docid.asDocInfo
}
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].