You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2020/03/03 14:00:16 UTC

[openwhisk] branch master updated: Implement an ElasticSearchActivationStore (#4724)

This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 05ed4e1  Implement an ElasticSearchActivationStore (#4724)
05ed4e1 is described below

commit 05ed4e18eb53f5630735ffbe28726dce5a3481b5
Author: jiangpch <ji...@navercorp.com>
AuthorDate: Tue Mar 3 22:00:03 2020 +0800

    Implement an ElasticSearchActivationStore (#4724)
    
    * Implement an ElasticSearchActivationStore
    
    * Remove short headers and mismatched comments
    
    * typo fix
    
    * Use TestContainers
    
    * Neat changes
    
    * Update build.gradle
    
    * Fix TestContainers
    
    * fix mistake
    
    * Remove cache and add configuration example
    
    * Fix pureconfig
    
    * Apply review comments
    
    * Add result and annotations to mock WhiskActivations
    
    * Add instructions about how to use it
    
    * Update ansible/README.md
    
    * Remove vagrant changes
    
    * Add elasticsearch to `redo` and remove whitespace
    
    * Fix tests
---
 ansible/README.md                                  |  40 ++
 .../runUnitTests.sh => ansible/elasticsearch.yml   |  38 +-
 ansible/environments/docker-machine/hosts.j2.ini   |   3 +
 ansible/environments/local/hosts.j2.ini            |   3 +
 ansible/group_vars/all                             |  30 ++
 ansible/roles/controller/tasks/deploy.yml          |  16 +
 ansible/roles/elasticsearch/tasks/clean.yml        |  39 ++
 ansible/roles/elasticsearch/tasks/deploy.yml       |  94 +++++
 .../roles/elasticsearch/tasks/main.yml             |  26 +-
 .../elasticsearch/templates/elasticsearch.yml.j2   |  23 ++
 .../elasticsearch/templates/log4j2.properties.j2   |  10 +
 ansible/roles/invoker/tasks/deploy.yml             |  16 +
 common/scala/build.gradle                          |   2 +
 common/scala/src/main/resources/application.conf   |  15 +
 .../org/apache/openwhisk/core/WhiskConfig.scala    |   1 +
 .../ElasticSearchActivationStore.scala             | 417 +++++++++++++++++++++
 tests/build.gradle                                 |   1 +
 tests/src/test/resources/application.conf.j2       |   4 +
 .../ElasticSearchActivationStoreBehaviorBase.scala |  58 +++
 .../ElasticSearchActivationStoreTests.scala        |  66 ++++
 .../behavior/ActivationStoreCRUDBehaviors.scala    |  21 +-
 tools/build/README.md                              |   1 +
 tools/build/redo                                   |   4 +
 tools/travis/runUnitTests.sh                       |   1 +
 24 files changed, 886 insertions(+), 43 deletions(-)

diff --git a/ansible/README.md b/ansible/README.md
index 25955e5..5a685e6 100644
--- a/ansible/README.md
+++ b/ansible/README.md
@@ -191,6 +191,46 @@ ansible-playbook -i environments/<environment> routemgmt.yml
 - To use the API Gateway, you'll need to run `apigateway.yml` and `routemgmt.yml`.
 - Use `ansible-playbook -i environments/<environment> openwhisk.yml` to avoid wiping the data store. This is useful to start OpenWhisk after restarting your Operating System.
 
+### Using ElasticSearch to Store Activations
+
+You can use ElasticSearch (ES) to store activations separately while other entities remain stored in CouchDB. There is an Ansible playbook to setup a simple ES cluster for testing and development purposes.
+
+-   Provide your custom ES related ansible arguments:
+
+```
+elastic_protocol="http"
+elastic_index_pattern="openwhisk-%s" // this will be combined with namespace's name, so different namespace can use different index
+elastic_base_volume="esdata" // name of docker volume to store ES data
+elastic_cluster_name="openwhisk"
+elastic_java_opts="-Xms1g -Xmx1g"
+elastic_loglevel="INFO"
+elastic_username="admin"
+elastic_password="admin"
+elasticsearch_connect_string="x.x.x.x:9200,y.y.y.y:9200" // if you want to use an external ES cluster, add it
+```
+
+-  Then execute:
+
+```
+cd <openwhisk_home>
+./gradlew distDocker
+cd ansible
+# couchdb is still needed to store subjects and actions
+ansible-playbook -i environments/<environment> couchdb.yml
+ansible-playbook -i environments/<environment> initdb.yml
+ansible-playbook -i environments/<environment> wipe.yml
+# this will deploy a simple ES cluster, you can skip this to use external ES cluster
+ansible-playbook -i environments/<environment> elasticsearch.yml
+ansible-playbook -i environments/<environment> openwhisk.yml -e db_activation_backend=ElasticSearch
+
+# installs a catalog of public packages and actions
+ansible-playbook -i environments/<environment> postdeploy.yml
+
+# to use the API gateway
+ansible-playbook -i environments/<environment> apigateway.yml
+ansible-playbook -i environments/<environment> routemgmt.yml
+```
+
 ### Configuring the installation of `wsk` CLI
 There are two installation modes to install `wsk` CLI: remote and local.
 
diff --git a/tools/travis/runUnitTests.sh b/ansible/elasticsearch.yml
old mode 100755
new mode 100644
similarity index 52%
copy from tools/travis/runUnitTests.sh
copy to ansible/elasticsearch.yml
index 762aea6..ed05e3d
--- a/tools/travis/runUnitTests.sh
+++ b/ansible/elasticsearch.yml
@@ -1,5 +1,3 @@
-#!/usr/bin/env bash
-
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -16,21 +14,21 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-set -e
-
-SCRIPTDIR=$(cd $(dirname "$0") && pwd)
-ROOTDIR="$SCRIPTDIR/../.."
-
-cd $ROOTDIR/tools/travis
-export ORG_GRADLE_PROJECT_testSetName="REQUIRE_ONLY_DB"
-
-./scan.sh
-
-./setupPrereq.sh
-
-cat "$ROOTDIR/tests/src/test/resources/application.conf"
-
-./distDocker.sh
-
-./runTests.sh
+---
+# This playbook deploys a ElasticSearch cluster
+
+- hosts: elasticsearch
+  vars:
+    #
+    # host_group - usually "{{ groups['...'] }}" where '...' is what was used
+    #   for 'hosts' above.  The hostname of each host will be looked up in this
+    #   group to assign a zero-based index.  That index will be used in concert
+    #   with 'name_prefix' below to assign a host/container name.
+    host_group: "{{ groups['elasticsearch'] }}"
+    #
+    # name_prefix - a unique prefix for this set of elasticsearches.  The prefix
+    # will be used in combination with an index (determined using
+    # 'host_group' above) to name host/elasticsearcher.
+    name_prefix: "elasticsearch"
+  roles:
+  - elasticsearch
diff --git a/ansible/environments/docker-machine/hosts.j2.ini b/ansible/environments/docker-machine/hosts.j2.ini
index 1a49698..45f327b 100644
--- a/ansible/environments/docker-machine/hosts.j2.ini
+++ b/ansible/environments/docker-machine/hosts.j2.ini
@@ -36,6 +36,9 @@ invoker1                    ansible_host={{ docker_machine_ip }}
 [apigateway]
 {{ docker_machine_ip }}     ansible_host={{ docker_machine_ip }}
 
+[elasticsearch:children]
+db
+
 ; define variables
 [all:vars]
 ansible_connection=ssh
diff --git a/ansible/environments/local/hosts.j2.ini b/ansible/environments/local/hosts.j2.ini
index 210df4c..7512d966 100644
--- a/ansible/environments/local/hosts.j2.ini
+++ b/ansible/environments/local/hosts.j2.ini
@@ -31,6 +31,9 @@ invoker1            ansible_host=172.17.0.1 ansible_connection=local
 [db]
 172.17.0.1          ansible_host=172.17.0.1 ansible_connection=local
 
+[elasticsearch:children]
+db
+
 [redis]
 172.17.0.1          ansible_host=172.17.0.1 ansible_connection=local
 
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 7fb49c0..545f1b2 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -272,6 +272,27 @@ db:
     invoker:
       user: "{{ db_invoker_user | default(lookup('ini', 'db_username section=invoker file={{ playbook_dir }}/db_local.ini')) }}"
       pass: "{{ db_invoker_pass | default(lookup('ini', 'db_password section=invoker file={{ playbook_dir }}/db_local.ini')) }}"
+  activation_store:
+    backend: "{{ db_activation_backend | default('CouchDB') }}"
+  elasticsearch:
+    protocol: "{{ elastic_protocol | default('http') }}"
+    port: 9200
+    index_pattern: "{{ elastic_index_pattern | default('openwhisk-%s') }}"
+    base_transport_port: 9300
+    confdir: "{{ config_root_dir }}/elasticsearch"
+    dir:
+      become: "{{ elastic_dir_become | default(false) }}"
+    base_volume: "{{ elastic_base_volume | default('esdata') }}"
+    cluster_name: "{{ elastic_cluster_name | default('openwhisk') }}"
+    java_opts: "{{ elastic_java_opts | default('-Xms1g -Xmx1g') }}"
+    loglevel: "{{ elastic_loglevel | default('INFO') }}"
+    # the user id of elasticsearch process, default is 1000, if you have enabled user namespace
+    # for docker daemon, this need to be changed correspondingly
+    uid: "{{ elastic_uid | default(1000) }}"
+    auth:
+      admin:
+        username: "{{ elastic_username | default('admin') }}"
+        password: "{{ elastic_password | default('admin') }}"
 
 apigateway:
   port:
@@ -291,6 +312,15 @@ linux:
 couchdb:
   version: 2.3
 
+elasticsearch:
+  version: 6.7.2
+
+elasticsearch_connect_string: "{% set ret = [] %}\
+                               {% for host in groups['elasticsearch'] %}\
+                               {{ ret.append( hostvars[host].ansible_host + ':' + ((db.elasticsearch.port+loop.index-1)|string) ) }}\
+                               {% endfor %}\
+                               {{ ret | join(',') }}"
+
 docker:
   # The user to install docker for. Defaults to the ansible user if not set. This will be the user who is able to run
   # docker commands on a machine setup with prereq_build.yml
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 7e5ec60..11d3a9d 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -275,6 +275,22 @@
   set_fact:
     env: "{{ env | combine(controller.extraEnv) }}"
 
+- name: setup elasticsearch activation store env
+  set_fact:
+    elastic_env:
+      "CONFIG_whisk_activationStore_elasticsearch_protocol": "{{ db.elasticsearch.protocol}}"
+      "CONFIG_whisk_activationStore_elasticsearch_hosts": "{{ elasticsearch_connect_string }}"
+      "CONFIG_whisk_activationStore_elasticsearch_indexPattern": "{{ db.elasticsearch.index_pattern }}"
+      "CONFIG_whisk_activationStore_elasticsearch_username": "{{ db.elasticsearch.auth.admin.username }}"
+      "CONFIG_whisk_activationStore_elasticsearch_password": "{{ db.elasticsearch.auth.admin.password }}"
+      "CONFIG_whisk_spi_ActivationStoreProvider": "org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStoreProvider"
+  when: db.activation_store.backend == "ElasticSearch"
+
+- name: merge elasticsearch activation store env
+  set_fact:
+    env: "{{ env | combine(elastic_env) }}"
+  when: db.activation_store.backend == "ElasticSearch"
+
 - name: populate volumes for controller
   set_fact:
     controller_volumes:
diff --git a/ansible/roles/elasticsearch/tasks/clean.yml b/ansible/roles/elasticsearch/tasks/clean.yml
new file mode 100644
index 0000000..21109b7
--- /dev/null
+++ b/ansible/roles/elasticsearch/tasks/clean.yml
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+---
+# Remove ElasticSearch server
+
+- name: set elasticsearch container name and volume
+  set_fact:
+    elasticsearch_name: "{{ name_prefix ~ host_group.index(inventory_hostname) }}"
+    volume_name: "{{ db.elasticsearch.base_volume ~ host_group.index(inventory_hostname) }}"
+
+- name: remove ElasticSearch
+  vars:
+    elasticsearch_image: "{{ elasticsearch.docker_image | default('docker.elastic.co/elasticsearch/elasticsearch:' ~ elasticsearch.version ) }}"
+  docker_container:
+    name: "{{ elasticsearch_name }}"
+    image: "{{ elasticsearch_image }}"
+    keep_volumes: False
+    state: absent
+  ignore_errors: True
+
+- name: remove ElasticSearch conf dir
+  file:
+    path: "{{ db.elasticsearch.confdir }}/{{ elasticsearch_name }}"
+    state: absent
+  become: "{{ db.elasticsearch.dir.become }}"
diff --git a/ansible/roles/elasticsearch/tasks/deploy.yml b/ansible/roles/elasticsearch/tasks/deploy.yml
new file mode 100644
index 0000000..3728377
--- /dev/null
+++ b/ansible/roles/elasticsearch/tasks/deploy.yml
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+---
+# This role will run a ElasticSearch server on the db group
+
+- name: set the vm.max_map_count to 262144
+  sysctl:
+    name: vm.max_map_count
+    value: '262144'
+  become: true
+
+- name: set elasticsearch container name, volume and port
+  set_fact:
+    elasticsearch_name: "{{ name_prefix ~ host_group.index(inventory_hostname) }}"
+    volume_name: "{{ db.elasticsearch.base_volume ~ host_group.index(inventory_hostname) }}"
+    http_port: "{{ (db.elasticsearch.port|int) + host_group.index(inventory_hostname) }}"
+    transport_port: "{{ (db.elasticsearch.base_transport_port|int) + host_group.index(inventory_hostname) }}"
+
+- name: ensure elasticserach config directory is created with permissions
+  file:
+    path: "{{ db.elasticsearch.confdir }}/{{ elasticsearch_name }}"
+    state: directory
+    mode: 0755
+  become: "{{ db.elasticsearch.dir.become }}"
+
+# create volume direcotry if it's a directory path(not a named volume)
+- name: ensure elasticserach volume directory is created with permissions
+  file:
+    path: "{{ volume_name }}"
+    state: directory
+    mode: 0700
+    owner: "{{ db.elasticsearch.uid }}"
+  become: true
+  when: volume_name is search("/")
+
+- name: copy elasticsearch config file
+  template:
+    src: "elasticsearch.yml.j2"
+    dest: "{{ db.elasticsearch.confdir }}/{{ elasticsearch_name }}/elasticsearch.yml"
+    mode: 0644
+  become: "{{ db.elasticsearch.dir.become }}"
+
+- name: copy elasticsearch log config file
+  template:
+    src: "log4j2.properties.j2"
+    dest: "{{ db.elasticsearch.confdir }}/{{ elasticsearch_name }}/log4j2.properties"
+    mode: 0644
+  become: "{{ db.elasticsearch.dir.become }}"
+
+- name: "(re)start ElasticSearch from '{{ elasticsearch_image }} ' "
+  vars:
+    elasticsearch_image: "{{ elasticsearch.docker_image | default('docker.elastic.co/elasticsearch/elasticsearch:' ~ elasticsearch.version ) }}"
+  docker_container:
+    name: "{{ elasticsearch_name }}"
+    image: "{{ elasticsearch_image }}"
+    state: started
+    recreate: true
+    restart_policy: "{{ docker.restart.policy }}"
+    ports:
+      - "{{ http_port }}:9200"
+      - "{{ transport_port }}:{{ transport_port }}"
+    volumes:
+      - "{{ db.elasticsearch.confdir }}/{{ elasticsearch_name }}/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml"
+      - "{{ db.elasticsearch.confdir }}/{{ elasticsearch_name }}/log4j2.properties:/usr/share/elasticsearch/config/log4j2.properties"
+      - "{{ volume_name }}:/usr/share/elasticsearch/data"
+    pull: "{{ docker.pull_elasticsearch | default(true) }}"
+    ulimits:
+      - "nofile:262144:262144"
+      - "memlock:-1:-1"
+    env:
+      TZ: "{{ docker.timezone }}"
+      ES_JAVA_OPTS: "{{ db.elasticsearch.java_opts }}"
+
+- name: wait until ElasticSearch in this host is up and running
+  uri:
+    url: "{{ db.elasticsearch.protocol }}://{{ ansible_host }}:{{ http_port }}"
+  register: result
+  until: result.status == 200
+  retries: 12
+  delay: 5
diff --git a/tools/travis/runUnitTests.sh b/ansible/roles/elasticsearch/tasks/main.yml
old mode 100755
new mode 100644
similarity index 70%
copy from tools/travis/runUnitTests.sh
copy to ansible/roles/elasticsearch/tasks/main.yml
index 762aea6..e5cc947
--- a/tools/travis/runUnitTests.sh
+++ b/ansible/roles/elasticsearch/tasks/main.yml
@@ -1,5 +1,3 @@
-#!/usr/bin/env bash
-
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -16,21 +14,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+---
+# This role will deploy a database server. Use the role if you want to use ElasticSearch locally.
+# In deploy mode it will start the ElasticSearch container.
+# In clean mode it will remove the ElasticSearch container.
 
-set -e
-
-SCRIPTDIR=$(cd $(dirname "$0") && pwd)
-ROOTDIR="$SCRIPTDIR/../.."
-
-cd $ROOTDIR/tools/travis
-export ORG_GRADLE_PROJECT_testSetName="REQUIRE_ONLY_DB"
-
-./scan.sh
-
-./setupPrereq.sh
-
-cat "$ROOTDIR/tests/src/test/resources/application.conf"
-
-./distDocker.sh
+- import_tasks: deploy.yml
+  when: mode == "deploy"
 
-./runTests.sh
+- import_tasks: clean.yml
+  when: mode == "clean"
diff --git a/ansible/roles/elasticsearch/templates/elasticsearch.yml.j2 b/ansible/roles/elasticsearch/templates/elasticsearch.yml.j2
new file mode 100644
index 0000000..f3129a3
--- /dev/null
+++ b/ansible/roles/elasticsearch/templates/elasticsearch.yml.j2
@@ -0,0 +1,23 @@
+cluster.name: "{{ db.elasticsearch.cluster_name }}"
+node.name: "{{ elasticsearch_name }}"
+network.host: 0.0.0.0
+network.publish_host: {{ ansible_default_ipv4.address }}
+
+http.port: 9200
+transport.tcp.port: {{ transport_port }}
+
+# minimum_master_nodes need to be explicitly set when bound on a public IP
+# set to 1 to allow single node clusters
+# Details: https://github.com/elastic/elasticsearch/pull/17282
+discovery.zen.ping.unicast.hosts:
+{% for es in groups['elasticsearch'] %}
+   - {{ hostvars[es].ansible_host }}:{{ db.elasticsearch.base_transport_port + host_group.index(es)|int }}
+{% endfor %}
+discovery.zen.minimum_master_nodes: {{ (host_group|length / 2 + 1) | int}}
+
+gateway.recover_after_nodes: {{ (host_group|length / 2 + 1) | int }}
+gateway.expected_nodes: {{ host_group|length }}
+gateway.recover_after_time: 5m
+
+xpack.security.enabled: false
+bootstrap.memory_lock: true
diff --git a/ansible/roles/elasticsearch/templates/log4j2.properties.j2 b/ansible/roles/elasticsearch/templates/log4j2.properties.j2
new file mode 100644
index 0000000..cc8edde
--- /dev/null
+++ b/ansible/roles/elasticsearch/templates/log4j2.properties.j2
@@ -0,0 +1,10 @@
+status = error
+
+appender.console.type = Console
+appender.console.name = console
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] %marker%m%n
+
+rootLogger.appenderRef.console.ref = console
+
+rootLogger.level = {{ db.elasticsearch.loglevel }}
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 596a202..62361e6 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -290,6 +290,22 @@
   set_fact:
     env: "{{ env | combine(invoker.extraEnv) }}"
 
+- name: setup elasticsearch activation store env
+  set_fact:
+    elastic_env:
+      "CONFIG_whisk_activationStore_elasticsearch_protocol": "{{ db.elasticsearch.protocol}}"
+      "CONFIG_whisk_activationStore_elasticsearch_hosts": "{{ elasticsearch_connect_string }}"
+      "CONFIG_whisk_activationStore_elasticsearch_indexPattern": "{{ db.elasticsearch.index_pattern }}"
+      "CONFIG_whisk_activationStore_elasticsearch_username": "{{ db.elasticsearch.auth.admin.username }}"
+      "CONFIG_whisk_activationStore_elasticsearch_password": "{{ db.elasticsearch.auth.admin.password }}"
+      "CONFIG_whisk_spi_ActivationStoreProvider": "org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStoreProvider"
+  when: db.activation_store.backend == "ElasticSearch"
+
+- name: merge elasticsearch activation store env
+  set_fact:
+    env: "{{ env | combine(elastic_env) }}"
+  when: db.activation_store.backend == "ElasticSearch"
+
 - name: include plugins
   include_tasks: "{{ inv_item }}.yml"
   with_items: "{{ invoker_plugins | default([]) }}"
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 4f72527..4c434cc 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -86,6 +86,8 @@ dependencies {
     compile "io.reactivex:rxjava-reactive-streams:1.2.1"
     compile "com.microsoft.azure:azure-cosmosdb:2.6.2"
 
+    compile "com.sksamuel.elastic4s:elastic4s-http_${gradle.scala.depVersion}:6.7.4"
+
     compile ("com.lightbend.akka:akka-stream-alpakka-s3_${gradle.scala.depVersion}:1.1.2") {
         exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http
         exclude group: 'com.fasterxml.jackson.core'
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 808241d..d8d0a3c 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -320,6 +320,21 @@ whisk {
         # }
     }
 
+    # ActivationStore related configuration
+    # For example:
+    # activationStore {
+    #   elasticsearch {
+    #      protocol         =       # "http" or "https"
+    #      hosts            =       # the hosts address of ES, can be multi hosts combined with commas, like "172.17.0.1:9200,172.17.0.2:9200,172.17.0.3:9200"
+    #      index-pattern    =       # the index pattern used to tell which index an activation should be stored to, will be calculated with activation namespace,
+    #                                 for example, if the index-pattern is "openwhisk-%s", then activations under "whisk.system" will be saved in to index
+    #                                 "openwhisk-whisk.system", you can also save all namespaces activations into one index by set index-pattern to a raw string
+    #                                 like "openwhisk"
+    #      username         =       # username of the provide ES
+    #      password         =       # password of the provide ES
+    #   }
+    # }
+
     # transaction ID related configuration
     transactions {
         header = "X-Request-ID"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 62a4c98..51ac8f9 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -257,6 +257,7 @@ object ConfigKeys {
   val controllerActivation = s"$controller.activation"
 
   val activationStore = "whisk.activation-store"
+  val elasticSearchActivationStore = s"$activationStore.elasticsearch"
   val activationStoreWithFileStorage = s"$activationStore.with-file-storage"
 
   val metrics = "whisk.metrics"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
new file mode 100644
index 0000000..5d97688
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.elasticsearch
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import scala.language.postfixOps
+import akka.actor.ActorSystem
+import akka.event.Logging.ErrorLevel
+import akka.http.scaladsl.model._
+import akka.stream.scaladsl.Flow
+import akka.stream._
+import com.sksamuel.elastic4s.http.search.SearchHit
+import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties, NoOpRequestConfigCallback}
+import com.sksamuel.elastic4s.indexes.IndexRequest
+import com.sksamuel.elastic4s.searches.queries.RangeQuery
+import com.sksamuel.elastic4s.searches.queries.matches.MatchPhrase
+import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
+import org.apache.http.impl.client.BasicCredentialsProvider
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
+import pureconfig.loadConfigOrThrow
+import pureconfig.generic.auto._
+import spray.json._
+import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId}
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.containerpool.logging.ElasticSearchJsonProtocol._
+import org.apache.openwhisk.core.database._
+import org.apache.openwhisk.core.database.StoreUtils._
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.Messages
+import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback
+
+import scala.concurrent.{ExecutionContextExecutor, Future, Promise}
+import scala.util.Try
+
+case class ElasticSearchActivationStoreConfig(protocol: String,
+                                              hosts: String,
+                                              indexPattern: String,
+                                              username: String,
+                                              password: String)
+
+class ElasticSearchActivationStore(
+  httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
+  elasticSearchConfig: ElasticSearchActivationStoreConfig =
+    loadConfigOrThrow[ElasticSearchActivationStoreConfig](ConfigKeys.elasticSearchActivationStore),
+  useBatching: Boolean = false)(implicit actorSystem: ActorSystem,
+                                actorMaterializer: ActorMaterializer,
+                                logging: Logging)
+    extends ActivationStore {
+
+  import com.sksamuel.elastic4s.http.ElasticDsl._
+
+  private implicit val executionContextExecutor: ExecutionContextExecutor = actorSystem.dispatcher
+
+  private val httpClientCallback = new HttpClientConfigCallback {
+    override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
+      val provider = new BasicCredentialsProvider
+      provider.setCredentials(
+        AuthScope.ANY,
+        new UsernamePasswordCredentials(elasticSearchConfig.username, elasticSearchConfig.password))
+      httpClientBuilder.setDefaultCredentialsProvider(provider)
+    }
+  }
+
+  private val client =
+    ElasticClient(
+      ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"),
+      NoOpRequestConfigCallback,
+      httpClientCallback)
+
+  private val esType = "_doc"
+  private val maxOpenDbRequests = actorSystem.settings.config
+    .getInt("akka.http.host-connection-pool.max-connections") / 2
+  private val batcher: Batcher[IndexRequest, Either[ArtifactStoreException, DocInfo]] =
+    new Batcher(500, maxOpenDbRequests)(doStore(_)(TransactionId.dbBatcher))
+
+  private val minStart = 0L
+  private val maxStart = Instant.now.toEpochMilli + TimeUnit.DAYS.toMillis(365 * 100) //100 years from now
+
+  override def store(activation: WhiskActivation, context: UserContext)(
+    implicit transid: TransactionId,
+    notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
+
+    val start =
+      transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] 'activations' document: '${activation.docid}'")
+
+    val path = activation.annotations
+      .getAs[String](WhiskActivation.pathAnnotation)
+      .getOrElse(s"${activation.namespace}/${activation.name}")
+    // Escape `_id` field as it's not permitted in ElasticSearch, add `path` field for search, and
+    // convert annotations to JsObject as ElasticSearch doesn't support array with mixed types
+    // response.result can be any type ElasticSearch also doesn't support that, so convert it to a string
+    val response = JsObject(
+      activation.response.toJsonObject.fields
+        .updated("result", JsString(activation.response.result.toJson.compactPrint)))
+    val payload = JsObject(
+      activation.toDocumentRecord.fields - "_id" ++ Map(
+        "path" -> JsString(path),
+        "@timestamp" -> JsString(activation.start.toString),
+        "annotations" -> activation.annotations.toJsObject,
+        "response" -> response))
+
+    val index = generateIndex(activation.namespace.namespace)
+    val op = indexInto(index, esType).doc(payload.toString).id(activation.docid.asString)
+
+    // always use batching
+    val res = batcher.put(op).map {
+      case Right(docInfo) =>
+        transid
+          .finished(this, start, s"[PUT] 'activations' completed document: '${activation.docid}', response: '$docInfo'")
+        docInfo
+      case Left(e: ArtifactStoreException) =>
+        transid.failed(
+          this,
+          start,
+          s"[PUT] 'activations' failed to put document: '${activation.docid}'; ${e.getMessage}.",
+          ErrorLevel)
+        throw PutException("error on 'put'")
+    }
+
+    reportFailure(res, start, failure => s"[PUT] 'activations' internal error, failure: '${failure.getMessage}'")
+  }
+
+  private def doStore(ops: Seq[IndexRequest])(
+    implicit transid: TransactionId): Future[Seq[Either[ArtifactStoreException, DocInfo]]] = {
+    val count = ops.size
+    val start = transid.started(this, LoggingMarkers.DATABASE_BULK_SAVE, s"'activations' saving $count documents")
+    val res = client
+      .execute {
+        bulk(ops)
+      }
+      .map { res =>
+        if (res.status == StatusCodes.OK.intValue || res.status == StatusCodes.Created.intValue) {
+          res.result.items.map { bulkRes =>
+            if (bulkRes.status == StatusCodes.OK.intValue || bulkRes.status == StatusCodes.Created.intValue)
+              Right(DocInfo(bulkRes.id))
+            else
+              Left(PutException(
+                s"Unexpected error: ${bulkRes.error.map(e => s"${e.`type`}:${e.reason}").getOrElse("unknown")}, code: ${bulkRes.status} on 'bulk_put'"))
+          }
+        } else {
+          transid.failed(
+            this,
+            start,
+            s"'activations' failed to put documents, http status: '${res.status}'",
+            ErrorLevel)
+          throw PutException("Unexpected http response code: " + res.status)
+        }
+      }
+
+    reportFailure(res, start, failure => s"[PUT] 'activations' internal error, failure: '${failure.getMessage}'")
+  }
+
+  override def get(activationId: ActivationId, context: UserContext)(
+    implicit transid: TransactionId): Future[WhiskActivation] = {
+
+    val start =
+      transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET] 'activations' finding activation: '$activationId'")
+
+    val index = generateIndex(extractNamespace(activationId))
+    val res = client
+      .execute {
+        search(index) query { termQuery("_id", activationId.asString) }
+      }
+      .map { res =>
+        if (res.status == StatusCodes.OK.intValue) {
+          if (res.result.hits.total == 0) {
+            transid.finished(this, start, s"[GET] 'activations', document: '$activationId'; not found.")
+            throw NoDocumentException("not found on 'get'")
+          } else {
+            transid.finished(this, start, s"[GET] 'activations' completed: found activation '$activationId'")
+            deserializeHitToWhiskActivation(res.result.hits.hits(0))
+          }
+        } else if (res.status == StatusCodes.NotFound.intValue) {
+          transid.finished(this, start, s"[GET] 'activations', document: '$activationId'; not found.")
+          throw NoDocumentException("not found on 'get'")
+        } else {
+          transid
+            .finished(
+              this,
+              start,
+              s"[GET] 'activations' failed to get document: '$activationId'; http status: '${res.status}'")
+          throw GetException("Unexpected http response code: " + res.status)
+        }
+      } recoverWith {
+      case _: DeserializationException => throw DocumentUnreadable(Messages.corruptedEntity)
+    }
+
+    reportFailure(
+      res,
+      start,
+      failure => s"[GET] 'activations' internal error, doc: '$activationId', failure: '${failure.getMessage}'")
+  }
+
+  override def delete(activationId: ActivationId, context: UserContext)(
+    implicit transid: TransactionId,
+    notifier: Option[CacheChangeNotification]): Future[Boolean] = {
+
+    val start =
+      transid.started(this, LoggingMarkers.DATABASE_DELETE, s"[DEL] 'activations' deleting document: '$activationId'")
+
+    val index = generateIndex(extractNamespace(activationId))
+
+    val res = client
+      .execute {
+        deleteByQuery(index, esType, termQuery("_id", activationId.asString))
+      }
+      .map { res =>
+        if (res.status == StatusCodes.OK.intValue) {
+          if (res.result.deleted == 0) {
+            transid.finished(this, start, s"[DEL] 'activations', document: '$activationId'; not found.")
+            throw NoDocumentException("not found on 'delete'")
+          } else {
+            transid
+              .finished(
+                this,
+                start,
+                s"[DEL] 'activations' completed document: '$activationId', response: ${res.result}")
+            true
+          }
+        } else if (res.status == StatusCodes.NotFound.intValue) {
+          transid.finished(this, start, s"[DEL] 'activations', document: '$activationId'; not found.")
+          throw NoDocumentException("not found on 'delete'")
+        } else {
+          transid.failed(
+            this,
+            start,
+            s"[DEL] 'activations' failed to delete document: '$activationId'; http status: '${res.status}'",
+            ErrorLevel)
+          throw DeleteException("Unexpected http response code: " + res.status)
+        }
+      }
+
+    reportFailure(
+      res,
+      start,
+      failure => s"[DEL] 'activations' internal error, doc: '$activationId', failure: '${failure.getMessage}'")
+  }
+
+  override def countActivationsInNamespace(namespace: EntityPath,
+                                           name: Option[EntityPath] = None,
+                                           skip: Int,
+                                           since: Option[Instant] = None,
+                                           upto: Option[Instant] = None,
+                                           context: UserContext)(implicit transid: TransactionId): Future[JsObject] = {
+    require(skip >= 0, "skip should be non negative")
+    val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[COUNT] 'activations'")
+
+    val nameQuery = name
+      .map { path =>
+        matchPhraseQuery("path", namespace.addPath(path).asString)
+      }
+      .getOrElse {
+        matchPhraseQuery("namespace", namespace.asString)
+      }
+    val startRange = generateRangeQuery("start", since, upto)
+
+    val index = generateIndex(namespace.namespace)
+
+    val res = client
+      .execute {
+        count(index) query { must(nameQuery, startRange) }
+      }
+      .map { res =>
+        if (res.status == StatusCodes.OK.intValue) {
+          val out = if (res.result.count > skip) res.result.count - skip else 0L
+          transid.finished(this, start, s"[COUNT] 'activations' completed: count $out")
+          JsObject(WhiskActivation.collectionName -> JsNumber(out))
+        } else {
+          transid.failed(this, start, s"Unexpected http response code: ${res.status}", ErrorLevel)
+          throw QueryException("Unexpected http response code: " + res.status)
+        }
+      }
+
+    reportFailure(res, start, failure => s"[COUNT] 'activations' internal error, failure: '${failure.getMessage}'")
+  }
+
+  override def listActivationsMatchingName(
+    namespace: EntityPath,
+    name: EntityPath,
+    skip: Int,
+    limit: Int,
+    includeDocs: Boolean = false,
+    since: Option[Instant] = None,
+    upto: Option[Instant] = None,
+    context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = {
+
+    val nameQuery = matchPhraseQuery("path", namespace.addPath(name).asString)
+    listActivations(namespace, skip, limit, nameQuery, includeDocs, since, upto, context)
+  }
+
+  override def listActivationsInNamespace(
+    namespace: EntityPath,
+    skip: Int,
+    limit: Int,
+    includeDocs: Boolean = false,
+    since: Option[Instant] = None,
+    upto: Option[Instant] = None,
+    context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = {
+
+    val nameQuery = matchPhraseQuery("namespace", namespace.asString)
+    listActivations(namespace, skip, limit, nameQuery, includeDocs, since, upto, context)
+  }
+
+  private def listActivations(
+    namespace: EntityPath,
+    skip: Int,
+    limit: Int,
+    nameQuery: MatchPhrase,
+    includeDocs: Boolean = false,
+    since: Option[Instant] = None,
+    upto: Option[Instant] = None,
+    context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = {
+
+    require(skip >= 0, "skip should be non negative")
+    require(limit >= 0, "limit should be non negative")
+
+    val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[QUERY] 'activations'")
+    val startRange = generateRangeQuery("start", since, upto)
+    val index = generateIndex(namespace.namespace)
+
+    val res = client
+      .execute {
+        search(index) query { must(nameQuery, startRange) } sortByFieldDesc "start" limit limit from skip
+      }
+      .map { res =>
+        if (res.status == StatusCodes.OK.intValue) {
+          val out =
+            if (includeDocs)
+              Right(res.result.hits.hits.map(deserializeHitToWhiskActivation).toList)
+            else
+              Left(res.result.hits.hits.map(deserializeHitToWhiskActivation(_).summaryAsJson).toList)
+          transid.finished(this, start, s"[QUERY] 'activations' completed: matched ${res.result.hits.total}")
+          out
+
+        } else {
+          transid.failed(this, start, s"Unexpected http response code: ${res.status}", ErrorLevel)
+          throw QueryException("Unexpected http response code: " + res.status)
+        }
+      }
+
+    reportFailure(res, start, failure => s"failed to query activation with error ${failure.getMessage}")
+  }
+
+  private def deserializeHitToWhiskActivation(hit: SearchHit): WhiskActivation = {
+    restoreAnnotations(restoreResponse(hit.sourceAsString.parseJson.asJsObject)).convertTo[WhiskActivation]
+  }
+
+  private def restoreAnnotations(js: JsObject): JsObject = {
+    val annotations = js.fields
+      .get("annotations")
+      .map { anno =>
+        Try {
+          JsArray(anno.asJsObject.fields map { p =>
+            JsObject("key" -> JsString(p._1), "value" -> p._2)
+          } toSeq: _*)
+        }.getOrElse(JsArray.empty)
+      }
+      .getOrElse(JsArray.empty)
+    JsObject(js.fields.updated("annotations", annotations))
+  }
+
+  private def restoreResponse(js: JsObject): JsObject = {
+    val response = js.fields
+      .get("response")
+      .map { res =>
+        val temp = res.asJsObject.fields
+        Try {
+          val result = temp
+            .get("result")
+            .map { r =>
+              val JsString(data) = r
+              data.parseJson.asJsObject
+            }
+            .getOrElse(JsObject.empty)
+          JsObject(temp.updated("result", result))
+        }.getOrElse(JsObject(temp - "result"))
+      }
+      .getOrElse(JsObject.empty)
+    JsObject(js.fields.updated("response", response))
+  }
+
+  private def extractNamespace(activationId: ActivationId): String = {
+    activationId.toString.split("/")(0)
+  }
+
+  private def generateIndex(namespace: String): String = {
+    elasticSearchConfig.indexPattern.dropWhile(_ == '/') format namespace.toLowerCase
+  }
+
+  private def generateRangeQuery(key: String, since: Option[Instant], upto: Option[Instant]): RangeQuery = {
+    rangeQuery(key)
+      .gte(since.map(_.toEpochMilli).getOrElse(minStart))
+      .lte(upto.map(_.toEpochMilli).getOrElse(maxStart))
+  }
+}
+
+object ElasticSearchActivationStoreProvider extends ActivationStoreProvider {
+  override def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging) =
+    new ElasticSearchActivationStore(useBatching = true)(actorSystem, actorMaterializer, logging)
+}
diff --git a/tests/build.gradle b/tests/build.gradle
index fa9bff7..3234b0d 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -213,6 +213,7 @@ dependencies {
     compile "io.fabric8:kubernetes-server-mock:${gradle.kube_client.version}"
 
     compile "com.amazonaws:aws-java-sdk-s3:1.11.295"
+    compile "org.testcontainers:elasticsearch:1.12.3"
 
     compile project(':common:scala')
     compile project(':core:controller')
diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2
index e6a10df..98716e8 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -92,6 +92,10 @@ whisk {
             url = "{{ user_images_registry | default('') }}"
         }
     }
+
+    elasticsearch {
+        docker-image = "{{ elasticsearch.docker_image | default('docker.elastic.co/elasticsearch/elasticsearch:' ~ elasticsearch.version ) }}"
+    }
 }
 
 #test-only overrides so that tests can override defaults in application.conf (todo: move all defaults to reference.conf)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStoreBehaviorBase.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStoreBehaviorBase.scala
new file mode 100644
index 0000000..a13ac8c
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStoreBehaviorBase.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.elasticsearch
+
+import org.scalatest.FlatSpec
+import org.apache.openwhisk.core.controller.test.WhiskAuthHelpers
+import org.apache.openwhisk.core.database.UserContext
+import org.apache.openwhisk.core.database.test.behavior.ActivationStoreBehaviorBase
+import org.apache.openwhisk.core.entity.{ActivationResponse, Parameters, WhiskActivation}
+import org.testcontainers.elasticsearch.ElasticsearchContainer
+import pureconfig.loadConfigOrThrow
+import spray.json.{JsObject, JsString}
+
+trait ElasticSearchActivationStoreBehaviorBase extends FlatSpec with ActivationStoreBehaviorBase {
+  val imageName = loadConfigOrThrow[String]("whisk.elasticsearch.docker-image")
+  val container = new ElasticsearchContainer(imageName)
+  container.start()
+
+  override def afterAll = {
+    container.close()
+    super.afterAll()
+  }
+
+  override def storeType = "ElasticSearch"
+
+  val creds = WhiskAuthHelpers.newIdentity()
+  override val context = UserContext(creds)
+
+  override lazy val activationStore = {
+    val storeConfig =
+      ElasticSearchActivationStoreConfig("http", container.getHttpHostAddress, "unittest-%s", "fake", "fake")
+    new ElasticSearchActivationStore(None, storeConfig, true)
+  }
+
+  // add result and annotations
+  override def newActivation(ns: String, actionName: String, start: Long): WhiskActivation = {
+    super
+      .newActivation(ns, actionName, start)
+      .copy(
+        response = ActivationResponse.success(Some(JsObject("name" -> JsString("whisker")))),
+        annotations = Parameters("database", "elasticsearch") ++ Parameters("type", "test"))
+  }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStoreTests.scala
new file mode 100644
index 0000000..6fcc8db
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStoreTests.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.elasticsearch
+
+import java.time.Instant
+
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.database.UserContext
+import org.apache.openwhisk.core.database.test.behavior.ActivationStoreBehavior
+import org.apache.openwhisk.core.entity.{EntityPath, WhiskActivation}
+import org.apache.openwhisk.utils.retry
+
+@RunWith(classOf[JUnitRunner])
+class ElasticSearchActivationStoreTests
+    extends FlatSpec
+    with ElasticSearchActivationStoreBehaviorBase
+    with ActivationStoreBehavior {
+
+  override def checkGetActivation(activation: WhiskActivation)(implicit transid: TransactionId): Unit = {
+    retry(super.checkGetActivation(activation), 10)
+  }
+
+  override def checkDeleteActivation(activation: WhiskActivation)(implicit transid: TransactionId): Unit = {
+    retry(super.checkDeleteActivation(activation), 10)
+  }
+
+  override def checkQueryActivations(namespace: String,
+                                     name: Option[String] = None,
+                                     skip: Int = 0,
+                                     limit: Int = 1000,
+                                     includeDocs: Boolean = false,
+                                     since: Option[Instant] = None,
+                                     upto: Option[Instant] = None,
+                                     context: UserContext,
+                                     expected: IndexedSeq[WhiskActivation])(implicit transid: TransactionId): Unit = {
+    retry(super.checkQueryActivations(namespace, name, skip, limit, includeDocs, since, upto, context, expected), 10)
+  }
+
+  override def checkCountActivations(namespace: String,
+                                     name: Option[EntityPath] = None,
+                                     skip: Int = 0,
+                                     since: Option[Instant] = None,
+                                     upto: Option[Instant] = None,
+                                     context: UserContext,
+                                     expected: Long)(implicit transid: TransactionId): Unit = {
+    retry(super.checkCountActivations(namespace, name, skip, since, upto, context, expected), 10)
+  }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ActivationStoreCRUDBehaviors.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ActivationStoreCRUDBehaviors.scala
index 506ccec..8427527 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ActivationStoreCRUDBehaviors.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ActivationStoreCRUDBehaviors.scala
@@ -19,12 +19,24 @@ package org.apache.openwhisk.core.database.test.behavior
 
 import org.apache.openwhisk.common.TransactionId
 import org.apache.openwhisk.core.database.NoDocumentException
-import org.apache.openwhisk.core.entity.ActivationId
+import org.apache.openwhisk.core.entity.{ActivationId, WhiskActivation}
 
 import scala.util.Random
 
 trait ActivationStoreCRUDBehaviors extends ActivationStoreBehaviorBase {
 
+  protected def checkStoreActivation(activation: WhiskActivation)(implicit transid: TransactionId): Unit = {
+    store(activation, context) shouldBe activation.docinfo
+  }
+
+  protected def checkDeleteActivation(activation: WhiskActivation)(implicit transid: TransactionId): Unit = {
+    activationStore.delete(ActivationId(activation.docid.asString), context).futureValue shouldBe true
+  }
+
+  protected def checkGetActivation(activation: WhiskActivation)(implicit transid: TransactionId): Unit = {
+    activationStore.get(ActivationId(activation.docid.asString), context).futureValue shouldBe activation
+  }
+
   behavior of s"${storeType}ActivationStore store"
 
   it should "put activation and get docinfo" in {
@@ -32,8 +44,7 @@ trait ActivationStoreCRUDBehaviors extends ActivationStoreBehaviorBase {
     val namespace = s"ns_${Random.alphanumeric.take(4).mkString}"
     val action = s"action1_${Random.alphanumeric.take(4).mkString}"
     val activation = newActivation(namespace, action, 1L)
-    val doc = store(activation, context)
-    doc shouldBe activation.docinfo
+    checkStoreActivation(activation)
   }
 
   behavior of s"${storeType}ActivationStore delete"
@@ -44,7 +55,7 @@ trait ActivationStoreCRUDBehaviors extends ActivationStoreBehaviorBase {
     val action = s"action1_${Random.alphanumeric.take(4).mkString}"
     val activation = newActivation(namespace, action, 1L)
     store(activation, context)
-    activationStore.delete(ActivationId(activation.docid.asString), context).futureValue shouldBe true
+    checkDeleteActivation(activation)
   }
 
   it should "throws NoDocumentException when activation does not exist" in {
@@ -60,7 +71,7 @@ trait ActivationStoreCRUDBehaviors extends ActivationStoreBehaviorBase {
     val action = s"action1_${Random.alphanumeric.take(4).mkString}"
     val activation = newActivation(namespace, action, 1L)
     store(activation, context)
-    activationStore.get(ActivationId(activation.docid.asString), context).futureValue shouldBe activation
+    checkGetActivation(activation)
   }
 
   it should "throws NoDocumentException when activation does not exist" in {
diff --git a/tools/build/README.md b/tools/build/README.md
index fcbd356..0282482 100644
--- a/tools/build/README.md
+++ b/tools/build/README.md
@@ -30,6 +30,7 @@ The script is called `redo` because for most development, one will want to "redo
 - usage information: `redo -h`
 - initialize environment and `docker-machine` (for mac): `redo setup prereq`
 - start CouchDB container and initialize DB with system and guest keys: `redo couchdb initdb`
+- start ElasticSearch container to store activations: `redo elasticsearch`
 - build and deploy system: `redo deploy`
 - run tests: `redo props tests`
 
diff --git a/tools/build/redo b/tools/build/redo
index 0534409..9ffa04e 100755
--- a/tools/build/redo
+++ b/tools/build/redo
@@ -237,6 +237,10 @@ Components = [
                   'recreate main db for entities',
                   yaml = 'wipe.yml'),
 
+    makeComponent('elasticsearch',
+                  'deploy elasticsearch',
+                  modes = 'clean'),
+
     makeComponent('build',
                   'build system',
                   yaml = False,
diff --git a/tools/travis/runUnitTests.sh b/tools/travis/runUnitTests.sh
index 762aea6..e2ef4ac 100755
--- a/tools/travis/runUnitTests.sh
+++ b/tools/travis/runUnitTests.sh
@@ -23,6 +23,7 @@ SCRIPTDIR=$(cd $(dirname "$0") && pwd)
 ROOTDIR="$SCRIPTDIR/../.."
 
 cd $ROOTDIR/tools/travis
+export TESTCONTAINERS_RYUK_DISABLED="true"
 export ORG_GRADLE_PROJECT_testSetName="REQUIRE_ONLY_DB"
 
 ./scan.sh