You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ch...@apache.org on 2019/07/29 06:42:04 UTC

[incubator-openwhisk] branch master updated: CosmosDB Cache Invalidation Service (#4314)

This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new ce45d54  CosmosDB Cache Invalidation Service (#4314)
ce45d54 is described below

commit ce45d54c824ef6c3e5d98ce0b220b924c81e688b
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Mon Jul 29 12:11:52 2019 +0530

    CosmosDB Cache Invalidation Service (#4314)
    
    Adds a new module to perform cache invalidation when running OpenWhisk with CosmosDB in multi region deployment mode. This service would ensure that changes made to whisk entities different OpenWhisk cluster are reflected in caches maintained by Controller
---
 common/scala/src/main/resources/logback.xml        |   1 +
 .../apache/openwhisk/common/ExecutorCloser.scala   |  44 ++++++++
 core/cosmosdb/cache-invalidator/.dockerignore      |   4 +
 core/cosmosdb/cache-invalidator/Dockerfile         |  34 ++++++
 core/cosmosdb/cache-invalidator/README.md          |  66 +++++++++++
 core/cosmosdb/cache-invalidator/build.gradle       |  55 ++++++++++
 core/cosmosdb/cache-invalidator/docker-compose.yml |  35 ++++++
 core/cosmosdb/cache-invalidator/init.sh            |  25 +++++
 .../src/main/resources/application.conf            |  24 ++++
 .../src/main/resources/reference.conf              |  57 ++++++++++
 .../src/main/resources/whisk-logback.xml           |  27 +++++
 .../database/cosmosdb/cache/CacheInvalidator.scala |  78 +++++++++++++
 .../cosmosdb/cache/CacheInvalidatorConfig.scala    |  73 ++++++++++++
 .../cosmosdb/cache/ChangeFeedListener.scala        |  77 +++++++++++++
 .../cosmosdb/cache/KafkaEventProducer.scala        |  69 ++++++++++++
 .../core/database/cosmosdb/cache/Main.scala        |  37 +++++++
 .../cosmosdb/cache/WhiskChangeEventObserver.scala  | 110 +++++++++++++++++++
 settings.gradle                                    |   1 +
 tests/build.gradle                                 |   8 +-
 tests/src/test/resources/logback-test.xml          |   6 +
 .../database/cosmosdb/CosmosDBTestSupport.scala    |   4 +-
 .../cosmosdb/cache/CacheInvalidatorTests.scala     | 122 +++++++++++++++++++++
 .../cache/WhiskChangeEventObserverTests.scala      |  83 ++++++++++++++
 23 files changed, 1036 insertions(+), 4 deletions(-)

diff --git a/common/scala/src/main/resources/logback.xml b/common/scala/src/main/resources/logback.xml
index 1a10343..1aa40ea 100644
--- a/common/scala/src/main/resources/logback.xml
+++ b/common/scala/src/main/resources/logback.xml
@@ -1,6 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <configuration>
   <jmxConfigurator></jmxConfigurator>
+  <include optional="true" resource="whisk-logback.xml"/>
   <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
     <encoder>
       <pattern>[%d{yyyy-MM-dd'T'HH:mm:ss.SSS'Z'}] [%p] %msg%n</pattern>
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/ExecutorCloser.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/ExecutorCloser.scala
new file mode 100644
index 0000000..134cf7a
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/ExecutorCloser.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common
+import java.io.Closeable
+import java.util.concurrent.{ExecutorService, TimeUnit}
+
+import akka.event.slf4j.SLF4JLogging
+
+import scala.concurrent.duration._
+
+case class ExecutorCloser(service: ExecutorService, timeout: FiniteDuration = 5.seconds)
+    extends Closeable
+    with SLF4JLogging {
+  override def close(): Unit = {
+    try {
+      service.shutdown()
+      service.awaitTermination(timeout.toSeconds, TimeUnit.SECONDS)
+    } catch {
+      case e: InterruptedException =>
+        log.error("Error while shutting down the ExecutorService", e)
+        Thread.currentThread.interrupt()
+    } finally {
+      if (!service.isShutdown) {
+        log.warn(s"ExecutorService `$service` didn't shutdown property. Will be forced now.")
+      }
+      service.shutdownNow()
+    }
+  }
+}
diff --git a/core/cosmosdb/cache-invalidator/.dockerignore b/core/cosmosdb/cache-invalidator/.dockerignore
new file mode 100644
index 0000000..d6a369a
--- /dev/null
+++ b/core/cosmosdb/cache-invalidator/.dockerignore
@@ -0,0 +1,4 @@
+*
+!init.sh
+!build/distributions
+!build/tmp/docker-coverage
diff --git a/core/cosmosdb/cache-invalidator/Dockerfile b/core/cosmosdb/cache-invalidator/Dockerfile
new file mode 100644
index 0000000..e8df857
--- /dev/null
+++ b/core/cosmosdb/cache-invalidator/Dockerfile
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM scala
+
+ENV UID=1001 \
+    NOT_ROOT_USER=owuser
+
+
+# Copy app jars
+ADD build/distributions/cache-invalidator.tar /
+
+COPY init.sh /
+RUN chmod +x init.sh
+
+RUN adduser -D -u ${UID} -h /home/${NOT_ROOT_USER} -s /bin/bash ${NOT_ROOT_USER}
+USER ${NOT_ROOT_USER}
+
+EXPOSE 8080
+CMD ["./init.sh", "0"]
diff --git a/core/cosmosdb/cache-invalidator/README.md b/core/cosmosdb/cache-invalidator/README.md
new file mode 100644
index 0000000..f420d74
--- /dev/null
+++ b/core/cosmosdb/cache-invalidator/README.md
@@ -0,0 +1,66 @@
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+# OpenWhisk Cache Invalidator Service
+
+This service performs cache invalidation in an OpenWhisk cluster to enable cache event propagation in multi region setups.
+
+## Design
+
+An OpenWhisk cluster uses a Kafka topic `cacheInvalidation` to communicate changes to any cached entity. Messages on this
+topic are of the form
+
+```json
+{"instanceId":"controller0","key":{"mainId":"guest/hello"}}
+```
+
+When deploying OpenWhisk across multiple nodes which do not share a common Kafka instance, we need a way to propagate the
+cache-change events across the cluster. For CosmosDB based setups this can be done by using [CosmosDB ChangeFeed][1]
+support. It enables reading changes that are made to any specific collection.
+
+This service makes use of [change feed processor][2] java library and listen to changes happening in `whisks` and `subject`
+collections and then convert them into Kafka message events which can be sent to `cacheInvalidation` topic local to the cluster
+
+## Usage
+
+The service needs following env variables to be set
+
+- `KAFKA_HOSTS` - For local env it can be set to `172.17.0.1:9093`. When using [OpenWhisk Devtools][3] based setup use `kafka`
+- `COSMOSDB_ENDPOINT` - Endpoint URL like https://<account>.documents.azure.com:443/
+- `COSMOSDB_KEY` - DB Key
+- `COSMOSDB_NAME` - DB name
+
+Upon startup it would create a collection to manage the lease data with name `cache-invalidator-lease`. For events sent by
+this service `instanceId` are sent to `cache-invalidator`
+
+## Local Run
+
+Setup the OpenWhisk cluster using [devtools][3] but have it connect to CosmosDB. This would also start
+the [Kafka Topic UI][4] at port `8001`. Then when changes are made to the database, you should see events sent to the Kafka
+topic. For example, if a a package is created with wsk package create test-package using the guest account, the following
+event is generated:
+
+```json
+ {"instanceId":"cache-invalidator","key":{"mainId":"guest/test-package"}}
+```
+
+
+[1]: https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed
+[2]: https://github.com/Azure/azure-documentdb-changefeedprocessor-java
+[3]: https://github.com/apache/incubator-openwhisk-devtools/tree/master/docker-compose
+[4]: https://github.com/Landoop/kafka-topics-ui
diff --git a/core/cosmosdb/cache-invalidator/build.gradle b/core/cosmosdb/cache-invalidator/build.gradle
new file mode 100644
index 0000000..1eae3b1
--- /dev/null
+++ b/core/cosmosdb/cache-invalidator/build.gradle
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'scala'
+apply plugin: 'application'
+apply plugin: 'eclipse'
+apply plugin: 'maven'
+apply plugin: 'org.scoverage'
+
+ext.dockerImageName = 'cache-invalidator-cosmosdb'
+apply from: "${project.rootProject.projectDir.absolutePath}/gradle/docker.gradle"
+distDocker.dependsOn ':common:scala:distDocker', 'distTar'
+
+project.archivesBaseName = "openwhisk-cache-invalidator-cosmosdb"
+
+ext.coverageJars = [
+    "${buildDir}/libs/${project.archivesBaseName}-$version-scoverage.jar",
+    "${project(':common:scala').buildDir.absolutePath}/libs/openwhisk-common-$version-scoverage.jar"
+]
+distDockerCoverage.dependsOn ':common:scala:jarScoverage', 'jarScoverage'
+
+repositories {
+    mavenCentral()
+}
+
+dependencies {
+    compile "org.scala-lang:scala-library:${gradle.scala.version}"
+    compile project(':common:scala')
+    compile ("com.microsoft.azure:azure-cosmos-changefeedprocessor:0.9.2") {
+        exclude group: 'junit'
+        exclude group: 'commons-logging'
+    }
+    compile 'com.typesafe.akka:akka-stream-kafka_2.12:1.0'
+    scoverage gradle.scoverage.deps
+}
+
+tasks.withType(ScalaCompile) {
+    scalaCompileOptions.additionalParameters = gradle.scala.compileFlags
+}
+
+mainClassName = "org.apache.openwhisk.core.database.cosmosdb.cache.Main"
diff --git a/core/cosmosdb/cache-invalidator/docker-compose.yml b/core/cosmosdb/cache-invalidator/docker-compose.yml
new file mode 100644
index 0000000..671bf13
--- /dev/null
+++ b/core/cosmosdb/cache-invalidator/docker-compose.yml
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+---
+version: '3'
+
+networks:
+  default:
+    external:
+      # Match the network name used by devtool docker compose setup
+      name: openwhisk_default
+
+services:
+  cache-invalidator:
+    image: whisk/cache-invalidator-cosmosdb
+    ports:
+      - "8080:8080"
+    environment:
+      - KAFKA_HOSTS=172.17.0.1:9093
+      - COSMOSDB_ENDPOINT=${COSMOSDB_ENDPOINT}
+      - COSMOSDB_KEY=${COSMOSDB_KEY}
+      - COSMOSDB_NAME=${COSMOSDB_NAME}
diff --git a/core/cosmosdb/cache-invalidator/init.sh b/core/cosmosdb/cache-invalidator/init.sh
new file mode 100644
index 0000000..ef58816
--- /dev/null
+++ b/core/cosmosdb/cache-invalidator/init.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+./copyJMXFiles.sh
+
+export CACHE_INVALIDATOR_OPTS
+CACHE_INVALIDATOR_OPTS="$CACHE_INVALIDATOR_OPTS $(./transformEnvironment.sh)"
+
+exec cache-invalidator/bin/cache-invalidator "$@"
diff --git a/core/cosmosdb/cache-invalidator/src/main/resources/application.conf b/core/cosmosdb/cache-invalidator/src/main/resources/application.conf
new file mode 100644
index 0000000..3baafbb
--- /dev/null
+++ b/core/cosmosdb/cache-invalidator/src/main/resources/application.conf
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+akka.kafka.producer {
+  # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
+  # can be defined in this configuration section.
+  kafka-clients {
+    bootstrap.servers = ${?KAFKA_HOSTS}
+  }
+}
diff --git a/core/cosmosdb/cache-invalidator/src/main/resources/reference.conf b/core/cosmosdb/cache-invalidator/src/main/resources/reference.conf
new file mode 100644
index 0000000..d31607e
--- /dev/null
+++ b/core/cosmosdb/cache-invalidator/src/main/resources/reference.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+whisk {
+  cache-invalidator {
+    cosmosdb {
+      # Endpoint URL like https://<account>.documents.azure.com:443/
+      endpoint            = ${?COSMOSDB_ENDPOINT}
+      # Access key
+      key                 = ${?COSMOSDB_KEY}
+      # Database name
+      db                  = ${?COSMOSDB_NAME}
+
+      # Name of collection in which lease related data is stored
+      lease-collection    = "cache-invalidator-lease"
+
+      # Feed processor host name
+      # If multiple instance running then set it to some unique name
+      hostname            = "cache-invalidator"
+
+      collections {
+        # Provide collection specific connection info here
+        # This can be used if lease collection is to be placed in a separate endpoint/db
+        # - whisks
+        # - subjects
+      }
+    }
+    # HTTP Server port
+    port = 8080
+
+    # Timeout for waiting for batch of feed changes to be published as Kafka event
+    feed-publish-timeout = 120 s
+
+    # Current clusterId - If configured then changes which are done by current cluster would be ignored
+    # i.e. no cache invalidation event message would be generated for those changes
+    # cluster-id =
+
+    event-producer {
+      # Queue size in KafkaEventProducer to hold cache invalidation message bfore flushing them to Kafka
+      buffer-size = 100
+    }
+  }
+}
diff --git a/core/cosmosdb/cache-invalidator/src/main/resources/whisk-logback.xml b/core/cosmosdb/cache-invalidator/src/main/resources/whisk-logback.xml
new file mode 100644
index 0000000..70936ac
--- /dev/null
+++ b/core/cosmosdb/cache-invalidator/src/main/resources/whisk-logback.xml
@@ -0,0 +1,27 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<included>
+    <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
+        <resetJUL>true</resetJUL>
+    </contextListener>
+    <!-- Disable verbose info logging from processor library-->
+    <logger name="com.microsoft.azure.documentdb.changefeedprocessor.services" level="WARN" />
+    <logger name="com.microsoft.azure.documentdb.changefeedprocessor.internal" level="WARN" />
+
+    <logger name="org.apache.openwhisk.core.database.cosmosdb.cache.WhiskChangeEventObserver" level="DEBUG" />
+</included>
\ No newline at end of file
diff --git a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
new file mode 100644
index 0000000..e318952
--- /dev/null
+++ b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openwhisk.core.database.cosmosdb.cache
+
+import akka.actor.{ActorSystem, CoordinatedShutdown}
+import akka.event.slf4j.SLF4JLogging
+import akka.kafka.ProducerSettings
+import akka.stream.ActorMaterializer
+import com.typesafe.config.Config
+import org.apache.kafka.common.serialization.StringSerializer
+import org.slf4j.bridge.SLF4JBridgeHandler
+
+import scala.concurrent.Future
+import scala.util.Success
+
+object CacheInvalidator extends SLF4JLogging {
+  //CosmosDB changefeed support uses Java Logging.
+  // Those needs to be routed to Slf4j
+  SLF4JBridgeHandler.removeHandlersForRootLogger()
+  SLF4JBridgeHandler.install()
+
+  //TODO Replace with constant from RemoteCacheInvalidation
+  val cacheInvalidationTopic = "cacheInvalidation"
+
+  val instanceId = "cache-invalidator"
+  val whisksCollection = "whisks"
+
+  def start(globalConfig: Config)(implicit system: ActorSystem, materializer: ActorMaterializer): Unit = {
+    val config = CacheInvalidatorConfig(globalConfig)
+    val producer =
+      KafkaEventProducer(
+        kafkaProducerSettings(defaultProducerConfig(globalConfig)),
+        cacheInvalidationTopic,
+        config.eventProducerConfig)
+    val observer = new WhiskChangeEventObserver(config.invalidatorConfig, producer)
+    val feedManager = new ChangeFeedManager(whisksCollection, observer, config)
+    registerShutdownTasks(system, feedManager, producer)
+    log.info(s"Started the Cache invalidator service. ClusterId [${config.invalidatorConfig.clusterId}]")
+  }
+
+  private def registerShutdownTasks(system: ActorSystem,
+                                    feedManager: ChangeFeedManager,
+                                    producer: KafkaEventProducer): Unit = {
+    CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "closeFeedListeners") { () =>
+      implicit val ec = system.dispatcher
+      Future
+        .successful {
+          feedManager.close()
+        }
+        .flatMap { _ =>
+          producer.close().andThen {
+            case Success(_) =>
+              log.info("Kafka producer successfully shutdown")
+          }
+        }
+    }
+  }
+
+  def kafkaProducerSettings(config: Config): ProducerSettings[String, String] =
+    ProducerSettings(config, new StringSerializer, new StringSerializer)
+
+  def defaultProducerConfig(globalConfig: Config): Config = globalConfig.getConfig("akka.kafka.producer")
+
+}
diff --git a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorConfig.scala b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorConfig.scala
new file mode 100644
index 0000000..ce6eed4
--- /dev/null
+++ b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorConfig.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb.cache
+
+import java.net.URI
+
+import com.microsoft.azure.documentdb.changefeedprocessor.{DocumentCollectionInfo => JDocumentCollectionInfo}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigUtil.joinPath
+import pureconfig.loadConfigOrThrow
+
+import scala.concurrent.duration.FiniteDuration
+
+case class DocumentCollectionInfo(connectionInfo: ConnectionInfo, collectionName: String) {
+
+  def asJava: JDocumentCollectionInfo = {
+    val info = new JDocumentCollectionInfo
+    info.setUri(new URI(connectionInfo.endpoint))
+    info.setDatabaseName(connectionInfo.db)
+    info.setCollectionName(collectionName)
+    info.setMasterKey(connectionInfo.key)
+    info
+  }
+}
+
+case class ConnectionInfo(endpoint: String, key: String, db: String)
+
+case class FeedConfig(hostname: String, leaseCollection: String)
+
+case class EventProducerConfig(bufferSize: Int)
+
+case class InvalidatorConfig(port: Int, feedPublishTimeout: FiniteDuration, clusterId: Option[String])
+
+case class CacheInvalidatorConfig(globalConfig: Config) {
+  val configRoot = "whisk.cache-invalidator"
+  val cosmosConfigRoot = s"$configRoot.cosmosdb"
+  val eventConfigRoot = s"$configRoot.event-producer"
+  val connections = "collections"
+  val feedConfig: FeedConfig = loadConfigOrThrow[FeedConfig](globalConfig.getConfig(cosmosConfigRoot))
+  val eventProducerConfig: EventProducerConfig =
+    loadConfigOrThrow[EventProducerConfig](globalConfig.getConfig(eventConfigRoot))
+  val invalidatorConfig: InvalidatorConfig = loadConfigOrThrow[InvalidatorConfig](globalConfig.getConfig(configRoot))
+
+  def getCollectionInfo(name: String): DocumentCollectionInfo = {
+    val config = globalConfig.getConfig(cosmosConfigRoot)
+    val specificConfigPath = joinPath(connections, name)
+
+    //Merge config specific to entity with common config
+    val entityConfig = if (config.hasPath(specificConfigPath)) {
+      config.getConfig(specificConfigPath).withFallback(config)
+    } else {
+      config
+    }
+
+    val info = loadConfigOrThrow[ConnectionInfo](entityConfig)
+    DocumentCollectionInfo(info, name)
+  }
+}
diff --git a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedListener.scala b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedListener.scala
new file mode 100644
index 0000000..48c1f93
--- /dev/null
+++ b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedListener.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openwhisk.core.database.cosmosdb.cache
+
+import java.io.Closeable
+import java.util
+
+import com.microsoft.azure.documentdb.changefeedprocessor.{
+  ChangeFeedEventHost,
+  ChangeFeedHostOptions,
+  ChangeFeedObserverCloseReason,
+  ChangeFeedObserverContext,
+  IChangeFeedObserver
+}
+import com.microsoft.azure.documentdb.{ChangeFeedOptions, Document}
+import org.apache.openwhisk.common.ExecutorCloser
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable.Seq
+
+class ChangeFeedManager(collName: String, observer: ChangeFeedObserver, config: CacheInvalidatorConfig)
+    extends Closeable {
+  private val listener = {
+    val collInfo = config.getCollectionInfo(collName)
+    val leaseCollInfo = config.getCollectionInfo(config.feedConfig.leaseCollection)
+    new ChangeFeedListener(collInfo, leaseCollInfo, config.feedConfig, observer, config.invalidatorConfig.clusterId)
+  }
+
+  override def close(): Unit = listener.close()
+}
+
+class ChangeFeedListener(collInfo: DocumentCollectionInfo,
+                         leaseCollInfo: DocumentCollectionInfo,
+                         feedConfig: FeedConfig,
+                         observer: ChangeFeedObserver,
+                         clusterId: Option[String])
+    extends Closeable {
+  private val host = {
+    val feedOpts = new ChangeFeedOptions
+    feedOpts.setPageSize(100)
+
+    val hostOpts = new ChangeFeedHostOptions
+    //Using same lease collection across collection. To avoid collision
+    //set prefix to coll name. Also include the clusterId such that multiple cluster
+    //can share the same collection
+    val prefix = clusterId.map(id => s"$id-${collInfo.collectionName}").getOrElse(collInfo.collectionName)
+    hostOpts.setLeasePrefix(prefix)
+
+    val host = new ChangeFeedEventHost(feedConfig.hostname, collInfo.asJava, leaseCollInfo.asJava, feedOpts, hostOpts)
+    host.registerObserverFactory(() => observer)
+    host
+  }
+
+  override def close(): Unit = ExecutorCloser(host.getExecutorService).close()
+}
+
+abstract class ChangeFeedObserver extends IChangeFeedObserver {
+  override final def open(context: ChangeFeedObserverContext): Unit = Unit
+  override final def close(context: ChangeFeedObserverContext, reason: ChangeFeedObserverCloseReason): Unit = Unit
+  override final def processChanges(context: ChangeFeedObserverContext, docs: util.List[Document]): Unit =
+    process(context, docs.asScala.toList)
+  def process(context: ChangeFeedObserverContext, doc: Seq[Document])
+}
diff --git a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
new file mode 100644
index 0000000..88a26e0
--- /dev/null
+++ b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb.cache
+
+import akka.Done
+import akka.actor.ActorSystem
+import akka.kafka.scaladsl.Producer
+import akka.kafka.{ProducerMessage, ProducerSettings}
+import akka.stream.scaladsl.{Keep, Sink, Source}
+import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import scala.collection.immutable.Seq
+import scala.concurrent.{ExecutionContext, Future, Promise}
+
+case class KafkaEventProducer(
+  settings: ProducerSettings[String, String],
+  topic: String,
+  eventProducerConfig: EventProducerConfig)(implicit system: ActorSystem, materializer: ActorMaterializer)
+    extends EventProducer {
+  private implicit val executionContext: ExecutionContext = system.dispatcher
+
+  private val queue = Source
+    .queue[(Seq[String], Promise[Done])](eventProducerConfig.bufferSize, OverflowStrategy.dropNew) //TODO Use backpressure
+    .map {
+      case (msgs, p) =>
+        ProducerMessage.multi(msgs.map(newRecord), p)
+    }
+    .via(Producer.flexiFlow(settings))
+    .map {
+      case ProducerMessage.MultiResult(_, passThrough) =>
+        passThrough.success(Done)
+      case _ => //As we use multi mode only other modes need not be handled
+    }
+    .toMat(Sink.ignore)(Keep.left)
+    .run
+
+  override def send(msg: Seq[String]): Future[Done] = {
+    val promise = Promise[Done]
+    queue.offer(msg -> promise).flatMap {
+      case QueueOfferResult.Enqueued    => promise.future
+      case QueueOfferResult.Dropped     => Future.failed(new Exception("Kafka request queue is full."))
+      case QueueOfferResult.QueueClosed => Future.failed(new Exception("Kafka request queue was closed."))
+      case QueueOfferResult.Failure(f)  => Future.failed(f)
+    }
+  }
+
+  def close(): Future[Done] = {
+    queue.complete()
+    queue.watchCompletion()
+  }
+
+  private def newRecord(msg: String) = new ProducerRecord[String, String](topic, "messages", msg)
+}
diff --git a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
new file mode 100644
index 0000000..7b3a7f2
--- /dev/null
+++ b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb.cache
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import kamon.Kamon
+import org.apache.openwhisk.common.ConfigMXBean
+import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
+
+object Main {
+  def main(args: Array[String]): Unit = {
+    implicit val system: ActorSystem = ActorSystem("cache-invalidator-actor-system")
+    implicit val materializer: ActorMaterializer = ActorMaterializer()
+    ConfigMXBean.register()
+    Kamon.loadReportersFromConfig()
+    val port = CacheInvalidatorConfig(system.settings.config).invalidatorConfig.port
+    //TODO HTTPS for ping/metric endpoint?
+    BasicHttpService.startHttpService(new BasicRasService {}.route, port, None)
+    CacheInvalidator.start(system.settings.config)
+  }
+}
diff --git a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserver.scala b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserver.scala
new file mode 100644
index 0000000..184ba75
--- /dev/null
+++ b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserver.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb.cache
+
+import akka.Done
+import akka.event.slf4j.SLF4JLogging
+import com.microsoft.azure.documentdb.Document
+import com.microsoft.azure.documentdb.changefeedprocessor.ChangeFeedObserverContext
+import kamon.metric.MeasurementUnit
+import org.apache.openwhisk.common.{LogMarkerToken, MetricEmitter}
+import org.apache.openwhisk.core.database.CacheInvalidationMessage
+import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants
+import org.apache.openwhisk.core.entity.CacheKey
+import org.apache.openwhisk.core.database.cosmosdb.CosmosDBUtil.unescapeId
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.immutable.Seq
+import scala.concurrent.{Await, Future}
+
+class WhiskChangeEventObserver(config: InvalidatorConfig, eventProducer: EventProducer) extends ChangeFeedObserver {
+  import WhiskChangeEventObserver._
+
+  override def process(context: ChangeFeedObserverContext, docs: Seq[Document]): Unit = {
+    //Each observer is called from a pool managed by CosmosDB ChangeFeedProcessor
+    //So its fine to have a blocking wait. If this fails then batch would be reread and
+    //retried thus ensuring at-least-once semantics
+    val f = eventProducer.send(processDocs(docs, config))
+    Await.result(f, config.feedPublishTimeout)
+    MetricEmitter.emitCounterMetric(feedCounter, docs.size)
+    recordLag(context, docs.last)
+  }
+}
+
+trait EventProducer {
+  def send(msg: Seq[String]): Future[Done]
+}
+
+object WhiskChangeEventObserver extends SLF4JLogging {
+  val instanceId = "cache-invalidator"
+  private val feedCounter =
+    LogMarkerToken("cosmosdb", "change_feed", "count", tags = Map("collection" -> "whisks"))(MeasurementUnit.none)
+  private val lags = new TrieMap[String, LogMarkerToken]
+
+  /**
+   * Records the current lag on per partition basis. In ideal cases the lag should not continue to increase
+   */
+  def recordLag(context: ChangeFeedObserverContext, lastDoc: Document): Unit = {
+    val sessionToken = context.getFeedResponde.getSessionToken
+    val lsnRef = lastDoc.get("_lsn")
+    require(lsnRef != null, s"Non lsn defined in document $lastDoc")
+
+    val lsn = lsnRef.toString.toLong
+    val sessionLsn = getSessionLsn(sessionToken)
+    val lag = sessionLsn - lsn
+    val partitionKey = context.getPartitionKeyRangeId
+    val gaugeToken = lags.getOrElseUpdate(partitionKey, createLagToken(partitionKey))
+    MetricEmitter.emitGaugeMetric(gaugeToken, lag)
+  }
+
+  private def createLagToken(partitionKey: String) = {
+    LogMarkerToken("cosmosdb", "change_feed", "lag", tags = Map("collection" -> "whisks", "pk" -> partitionKey))(
+      MeasurementUnit.none)
+  }
+
+  def getSessionLsn(token: String): Long = {
+    // Session Token can be in two formats. Either {PartitionKeyRangeId}:{LSN}
+    // or {PartitionKeyRangeId}:{Version}#{GlobalLSN}
+    // See https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet/pull/113/files#diff-54cbd8ddcc33cab4120c8af04869f881
+    val parsedSessionToken = token.substring(token.indexOf(":") + 1)
+    val segments = parsedSessionToken.split("#")
+    val lsn = if (segments.size < 2) segments(0) else segments(1)
+    lsn.toLong
+  }
+
+  def processDocs(docs: Seq[Document], config: InvalidatorConfig): Seq[String] = {
+    docs
+      .filter { doc =>
+        val cid = Option(doc.getString(CosmosDBConstants.clusterId))
+        val currentCid = config.clusterId
+
+        //only if current clusterId is configured do a check
+        currentCid match {
+          case Some(_) => cid != currentCid
+          case None    => true
+        }
+      }
+      .map { doc =>
+        val id = unescapeId(doc.getId)
+        log.debug("Changed doc [{}]", id)
+        val event = CacheInvalidationMessage(CacheKey(id), instanceId)
+        event.serialize
+      }
+  }
+
+}
diff --git a/settings.gradle b/settings.gradle
index 40c59bb..a6eb3da 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -19,6 +19,7 @@ include 'common:scala'
 
 include 'core:controller'
 include 'core:invoker'
+include 'core:cosmosdb:cache-invalidator'
 include 'core:standalone'
 
 include 'tests'
diff --git a/tests/build.gradle b/tests/build.gradle
index 48fc925..8f43ea6 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -185,12 +185,14 @@ dependencies {
     compile 'io.opentracing:opentracing-mock:0.31.0'
     compile "org.apache.curator:curator-test:${gradle.curator.version}"
     compile 'com.atlassian.oai:swagger-request-validator-core:1.4.5'
+    compile 'com.typesafe.akka:akka-stream-kafka-testkit_2.12:1.0'
 
     compile "com.amazonaws:aws-java-sdk-s3:1.11.295"
 
     compile project(':common:scala')
     compile project(':core:controller')
     compile project(':core:invoker')
+    compile project(':core:cosmosdb:cache-invalidator')
     compile project(':tools:admin')
 
 
@@ -281,7 +283,8 @@ task reportCoverage(type: ScoverageReport) {
         ':common:scala:reportScoverage',
         ':core:controller:reportScoverage',
         ':core:invoker:reportScoverage',
-        ':tools:admin:reportScoverage'
+        ':tools:admin:reportScoverage',
+        ':core:cosmosdb:cache-invalidator:reportScoverage',
     ])
 
 }
@@ -340,7 +343,8 @@ def getScoverageClasspath(Project project) {
         ':common:scala',
         ':core:controller',
         ':core:invoker',
-        ':tools:admin'
+        ':tools:admin',
+        ':core:cosmosdb:cache-invalidator'
     ]
     def combinedClasspath = projectNames.inject(project.files([])) { result, name ->
         def cp = project.project(name).sourceSets.scoverage.runtimeClasspath
diff --git a/tests/src/test/resources/logback-test.xml b/tests/src/test/resources/logback-test.xml
index ee750fa..40a1653 100644
--- a/tests/src/test/resources/logback-test.xml
+++ b/tests/src/test/resources/logback-test.xml
@@ -19,6 +19,12 @@
 
     <logger name="akka.event.slf4j.Slf4jLogger" level="WARN" />
 
+    <!-- Disable verbose info logging from processor library-->
+    <logger name="com.microsoft.azure.documentdb.changefeedprocessor.services" level="WARN" />
+    <logger name="com.microsoft.azure.documentdb.changefeedprocessor.internal" level="WARN" />
+
+    <logger name="org.apache.openwhisk.core.database.cosmosdb.cache" level="DEBUG" />
+
     <root level="${logback.log.level:-INFO}">
         <appender-ref ref="console" />
     </root>
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBTestSupport.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBTestSupport.scala
index 4c50102..6a70555 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBTestSupport.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBTestSupport.scala
@@ -18,7 +18,7 @@
 package org.apache.openwhisk.core.database.cosmosdb
 
 import com.microsoft.azure.cosmosdb.{Database, SqlParameter, SqlParameterCollection, SqlQuerySpec}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec}
+import org.scalatest.{BeforeAndAfterAll, FlatSpecLike}
 import pureconfig.loadConfigOrThrow
 import org.apache.openwhisk.core.ConfigKeys
 import org.apache.openwhisk.core.database.test.behavior.ArtifactStoreTestUtil.storeAvailable
@@ -26,7 +26,7 @@ import org.apache.openwhisk.core.database.test.behavior.ArtifactStoreTestUtil.st
 import scala.collection.mutable.ListBuffer
 import scala.util.{Random, Try}
 
-trait CosmosDBTestSupport extends FlatSpec with BeforeAndAfterAll with RxObservableImplicits {
+trait CosmosDBTestSupport extends FlatSpecLike with BeforeAndAfterAll with RxObservableImplicits {
   private val dbsToDelete = ListBuffer[Database]()
 
   lazy val storeConfigTry = Try { loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb) }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
new file mode 100644
index 0000000..8ad122c
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb.cache
+import akka.actor.CoordinatedShutdown
+import akka.kafka.testkit.internal.TestFrameworkInterface
+import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec}
+import akka.stream.ActorMaterializer
+import com.typesafe.config.ConfigFactory
+import common.StreamLogging
+import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.database.CacheInvalidationMessage
+import org.apache.openwhisk.core.database.cosmosdb.{CosmosDBArtifactStoreProvider, CosmosDBTestSupport}
+import org.apache.openwhisk.core.entity.{
+  DocumentReader,
+  EntityName,
+  EntityPath,
+  WhiskDocumentReader,
+  WhiskEntity,
+  WhiskEntityJsonFormat,
+  WhiskPackage
+}
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{Matchers, TryValues}
+
+import scala.concurrent.duration._
+import scala.util.Random
+
+//TODO Make test port dynamic
+@RunWith(classOf[JUnitRunner])
+class CacheInvalidatorTests
+    extends ScalatestKafkaSpec(6061)
+    with EmbeddedKafkaLike
+    with EmbeddedKafka
+    with CosmosDBTestSupport
+    with Matchers
+    with ScalaFutures
+    with IntegrationPatience
+    with TryValues
+    with TestFrameworkInterface.Scalatest
+    with StreamLogging {
+
+  implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+  override def createKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)
+
+  behavior of "CosmosDB CacheInvalidation"
+
+  private val server = s"localhost:$kafkaPort"
+  private var dbName: String = _
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    CoordinatedShutdown(system).run(CoordinatedShutdown.ClusterDowningReason)
+    shutdown()
+  }
+
+  it should "send event upon entity change" in {
+    implicit val tid = TransactionId.testing
+    implicit val docReader: DocumentReader = WhiskDocumentReader
+    implicit val format = WhiskEntityJsonFormat
+    dbName = createTestDB().getId
+    val dbConfig = storeConfig.copy(db = dbName)
+    val store = CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](dbConfig, None)
+    val pkg = WhiskPackage(EntityPath("cacheInvalidationTest"), EntityName(randomString()))
+
+    //Store stuff in db
+    val info = store.put(pkg).futureValue
+    log.info(s"Added document ${info.id}")
+
+    //Start cache invalidator after the db for whisks is created
+    startCacheInvalidator()
+    //This should result in change feed trigger and event to kafka topic
+    val topic = "cacheInvalidation"
+    val msgs =
+      consumeNumberMessagesFromTopics(Set(topic), 1, timeout = 60.seconds)(createKafkaConfig, new StringDeserializer())(
+        topic)
+
+    CacheInvalidationMessage.parse(msgs.head).get.key.mainId shouldBe pkg.docid.asString
+
+    store.del(info).futureValue
+  }
+
+  private def randomString() = Random.alphanumeric.take(5).mkString
+
+  private def startCacheInvalidator() = {
+    val tsconfig = ConfigFactory.parseString(s"""
+      |akka.kafka.producer {
+      |  kafka-clients {
+      |    bootstrap.servers = "$server"
+      |  }
+      |}
+      |whisk {
+      |  cache-invalidator {
+      |    cosmosdb {
+      |      db = "$dbName"
+      |    }
+      |  }
+      |}
+      """.stripMargin).withFallback(ConfigFactory.load())
+    CacheInvalidator.start(tsconfig)
+  }
+
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserverTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserverTests.scala
new file mode 100644
index 0000000..a372fa3
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserverTests.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb.cache
+import com.microsoft.azure.documentdb.Document
+import org.apache.openwhisk.core.database.CacheInvalidationMessage
+import org.apache.openwhisk.core.entity.CacheKey
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+import spray.json.DefaultJsonProtocol
+
+import scala.collection.immutable.Seq
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class WhiskChangeEventObserverTests extends FlatSpec with Matchers {
+  import WhiskChangeEventObserver.instanceId
+
+  behavior of "CosmosDB extract LSN from Session token"
+
+  it should "parse old session token" in {
+    WhiskChangeEventObserver.getSessionLsn("0:12345") shouldBe 12345
+  }
+
+  it should "parse new session token" in {
+    WhiskChangeEventObserver.getSessionLsn("0:-1#12345") shouldBe 12345
+  }
+
+  it should "parse new session token with multiple regional lsn" in {
+    WhiskChangeEventObserver.getSessionLsn("0:-1#12345#Region1=1#Region2=2") shouldBe 12345
+  }
+
+  behavior of "CosmosDB feed events"
+
+  it should "generate cache events" in {
+    val config = InvalidatorConfig(8080, 60.seconds, None)
+    val docs = Seq(createDoc("foo"), createDoc("bar"))
+    val processedDocs = WhiskChangeEventObserver.processDocs(docs, config)
+
+    processedDocs.map(CacheInvalidationMessage.parse(_).get) shouldBe Seq(
+      CacheInvalidationMessage(CacheKey("foo"), instanceId),
+      CacheInvalidationMessage(CacheKey("bar"), instanceId))
+  }
+
+  it should "filter clusterId" in {
+    val config = InvalidatorConfig(8080, 60.seconds, Some("cid1"))
+    val docs = Seq(createDoc("foo", Some("cid2")), createDoc("bar", Some("cid1")), createDoc("baz"))
+    val processedDocs = WhiskChangeEventObserver.processDocs(docs, config)
+
+    //Should not include bar as the clusterId matches
+    processedDocs.map(CacheInvalidationMessage.parse(_).get) shouldBe Seq(
+      CacheInvalidationMessage(CacheKey("foo"), instanceId),
+      CacheInvalidationMessage(CacheKey("baz"), instanceId))
+  }
+
+  private def createDoc(id: String, clusterId: Option[String] = None): Document = {
+    val cdoc = CosmosDBDoc(id, clusterId)
+    val json = CosmosDBDoc.seredes.write(cdoc).compactPrint
+    new Document(json)
+  }
+
+  case class CosmosDBDoc(id: String, _clusterId: Option[String], _lsn: Int = 42)
+
+  object CosmosDBDoc extends DefaultJsonProtocol {
+    implicit val seredes = jsonFormat3(CosmosDBDoc.apply)
+  }
+
+}