You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2020/03/03 14:00:16 UTC
[openwhisk] branch master updated: Implement an
ElasticSearchActivationStore (#4724)
This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 05ed4e1 Implement an ElasticSearchActivationStore (#4724)
05ed4e1 is described below
commit 05ed4e18eb53f5630735ffbe28726dce5a3481b5
Author: jiangpch <ji...@navercorp.com>
AuthorDate: Tue Mar 3 22:00:03 2020 +0800
Implement an ElasticSearchActivationStore (#4724)
* Implement an ElasticSearchActivationStore
* Remove short headers and mismatched comments
* typo fix
* Use TestContainers
* Neat changes
* Update build.gradle
* Fix TestContainers
* fix mistake
* Remove cache and add configuration example
* Fix pureconfig
* Apply review comments
* Add result and annotations to mock WhiskActivations
* Add instructions about how to use it
* Update ansible/README.md
* Remove vagrant changes
* Add elasticsearch to `redo` and remove whitespace
* Fix tests
---
ansible/README.md | 40 ++
.../runUnitTests.sh => ansible/elasticsearch.yml | 38 +-
ansible/environments/docker-machine/hosts.j2.ini | 3 +
ansible/environments/local/hosts.j2.ini | 3 +
ansible/group_vars/all | 30 ++
ansible/roles/controller/tasks/deploy.yml | 16 +
ansible/roles/elasticsearch/tasks/clean.yml | 39 ++
ansible/roles/elasticsearch/tasks/deploy.yml | 94 +++++
.../roles/elasticsearch/tasks/main.yml | 26 +-
.../elasticsearch/templates/elasticsearch.yml.j2 | 23 ++
.../elasticsearch/templates/log4j2.properties.j2 | 10 +
ansible/roles/invoker/tasks/deploy.yml | 16 +
common/scala/build.gradle | 2 +
common/scala/src/main/resources/application.conf | 15 +
.../org/apache/openwhisk/core/WhiskConfig.scala | 1 +
.../ElasticSearchActivationStore.scala | 417 +++++++++++++++++++++
tests/build.gradle | 1 +
tests/src/test/resources/application.conf.j2 | 4 +
.../ElasticSearchActivationStoreBehaviorBase.scala | 58 +++
.../ElasticSearchActivationStoreTests.scala | 66 ++++
.../behavior/ActivationStoreCRUDBehaviors.scala | 21 +-
tools/build/README.md | 1 +
tools/build/redo | 4 +
tools/travis/runUnitTests.sh | 1 +
24 files changed, 886 insertions(+), 43 deletions(-)
diff --git a/ansible/README.md b/ansible/README.md
index 25955e5..5a685e6 100644
--- a/ansible/README.md
+++ b/ansible/README.md
@@ -191,6 +191,46 @@ ansible-playbook -i environments/<environment> routemgmt.yml
- To use the API Gateway, you'll need to run `apigateway.yml` and `routemgmt.yml`.
- Use `ansible-playbook -i environments/<environment> openwhisk.yml` to avoid wiping the data store. This is useful to start OpenWhisk after restarting your Operating System.
+### Using ElasticSearch to Store Activations
+
+You can use ElasticSearch (ES) to store activations separately while other entities remain stored in CouchDB. There is an Ansible playbook to setup a simple ES cluster for testing and development purposes.
+
+- Provide your custom ES related ansible arguments:
+
+```
+elastic_protocol="http"
+elastic_index_pattern="openwhisk-%s" // this will be combined with namespace's name, so different namespace can use different index
+elastic_base_volume="esdata" // name of docker volume to store ES data
+elastic_cluster_name="openwhisk"
+elastic_java_opts="-Xms1g -Xmx1g"
+elastic_loglevel="INFO"
+elastic_username="admin"
+elastic_password="admin"
+elasticsearch_connect_string="x.x.x.x:9200,y.y.y.y:9200" // if you want to use an external ES cluster, add it
+```
+
+- Then execute:
+
+```
+cd <openwhisk_home>
+./gradlew distDocker
+cd ansible
+# couchdb is still needed to store subjects and actions
+ansible-playbook -i environments/<environment> couchdb.yml
+ansible-playbook -i environments/<environment> initdb.yml
+ansible-playbook -i environments/<environment> wipe.yml
+# this will deploy a simple ES cluster, you can skip this to use external ES cluster
+ansible-playbook -i environments/<environment> elasticsearch.yml
+ansible-playbook -i environments/<environment> openwhisk.yml -e db_activation_backend=ElasticSearch
+
+# installs a catalog of public packages and actions
+ansible-playbook -i environments/<environment> postdeploy.yml
+
+# to use the API gateway
+ansible-playbook -i environments/<environment> apigateway.yml
+ansible-playbook -i environments/<environment> routemgmt.yml
+```
+
### Configuring the installation of `wsk` CLI
There are two installation modes to install `wsk` CLI: remote and local.
diff --git a/tools/travis/runUnitTests.sh b/ansible/elasticsearch.yml
old mode 100755
new mode 100644
similarity index 52%
copy from tools/travis/runUnitTests.sh
copy to ansible/elasticsearch.yml
index 762aea6..ed05e3d
--- a/tools/travis/runUnitTests.sh
+++ b/ansible/elasticsearch.yml
@@ -1,5 +1,3 @@
-#!/usr/bin/env bash
-
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -16,21 +14,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-set -e
-
-SCRIPTDIR=$(cd $(dirname "$0") && pwd)
-ROOTDIR="$SCRIPTDIR/../.."
-
-cd $ROOTDIR/tools/travis
-export ORG_GRADLE_PROJECT_testSetName="REQUIRE_ONLY_DB"
-
-./scan.sh
-
-./setupPrereq.sh
-
-cat "$ROOTDIR/tests/src/test/resources/application.conf"
-
-./distDocker.sh
-
-./runTests.sh
+---
+# This playbook deploys a ElasticSearch cluster
+
+- hosts: elasticsearch
+ vars:
+ #
+ # host_group - usually "{{ groups['...'] }}" where '...' is what was used
+ # for 'hosts' above. The hostname of each host will be looked up in this
+ # group to assign a zero-based index. That index will be used in concert
+ # with 'name_prefix' below to assign a host/container name.
+ host_group: "{{ groups['elasticsearch'] }}"
+ #
+ # name_prefix - a unique prefix for this set of elasticsearches. The prefix
+ # will be used in combination with an index (determined using
+ # 'host_group' above) to name host/elasticsearcher.
+ name_prefix: "elasticsearch"
+ roles:
+ - elasticsearch
diff --git a/ansible/environments/docker-machine/hosts.j2.ini b/ansible/environments/docker-machine/hosts.j2.ini
index 1a49698..45f327b 100644
--- a/ansible/environments/docker-machine/hosts.j2.ini
+++ b/ansible/environments/docker-machine/hosts.j2.ini
@@ -36,6 +36,9 @@ invoker1 ansible_host={{ docker_machine_ip }}
[apigateway]
{{ docker_machine_ip }} ansible_host={{ docker_machine_ip }}
+[elasticsearch:children]
+db
+
; define variables
[all:vars]
ansible_connection=ssh
diff --git a/ansible/environments/local/hosts.j2.ini b/ansible/environments/local/hosts.j2.ini
index 210df4c..7512d966 100644
--- a/ansible/environments/local/hosts.j2.ini
+++ b/ansible/environments/local/hosts.j2.ini
@@ -31,6 +31,9 @@ invoker1 ansible_host=172.17.0.1 ansible_connection=local
[db]
172.17.0.1 ansible_host=172.17.0.1 ansible_connection=local
+[elasticsearch:children]
+db
+
[redis]
172.17.0.1 ansible_host=172.17.0.1 ansible_connection=local
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 7fb49c0..545f1b2 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -272,6 +272,27 @@ db:
invoker:
user: "{{ db_invoker_user | default(lookup('ini', 'db_username section=invoker file={{ playbook_dir }}/db_local.ini')) }}"
pass: "{{ db_invoker_pass | default(lookup('ini', 'db_password section=invoker file={{ playbook_dir }}/db_local.ini')) }}"
+ activation_store:
+ backend: "{{ db_activation_backend | default('CouchDB') }}"
+ elasticsearch:
+ protocol: "{{ elastic_protocol | default('http') }}"
+ port: 9200
+ index_pattern: "{{ elastic_index_pattern | default('openwhisk-%s') }}"
+ base_transport_port: 9300
+ confdir: "{{ config_root_dir }}/elasticsearch"
+ dir:
+ become: "{{ elastic_dir_become | default(false) }}"
+ base_volume: "{{ elastic_base_volume | default('esdata') }}"
+ cluster_name: "{{ elastic_cluster_name | default('openwhisk') }}"
+ java_opts: "{{ elastic_java_opts | default('-Xms1g -Xmx1g') }}"
+ loglevel: "{{ elastic_loglevel | default('INFO') }}"
+ # the user id of elasticsearch process, default is 1000, if you have enabled user namespace
+ # for docker daemon, this need to be changed correspondingly
+ uid: "{{ elastic_uid | default(1000) }}"
+ auth:
+ admin:
+ username: "{{ elastic_username | default('admin') }}"
+ password: "{{ elastic_password | default('admin') }}"
apigateway:
port:
@@ -291,6 +312,15 @@ linux:
couchdb:
version: 2.3
+elasticsearch:
+ version: 6.7.2
+
+elasticsearch_connect_string: "{% set ret = [] %}\
+ {% for host in groups['elasticsearch'] %}\
+ {{ ret.append( hostvars[host].ansible_host + ':' + ((db.elasticsearch.port+loop.index-1)|string) ) }}\
+ {% endfor %}\
+ {{ ret | join(',') }}"
+
docker:
# The user to install docker for. Defaults to the ansible user if not set. This will be the user who is able to run
# docker commands on a machine setup with prereq_build.yml
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 7e5ec60..11d3a9d 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -275,6 +275,22 @@
set_fact:
env: "{{ env | combine(controller.extraEnv) }}"
+- name: setup elasticsearch activation store env
+ set_fact:
+ elastic_env:
+ "CONFIG_whisk_activationStore_elasticsearch_protocol": "{{ db.elasticsearch.protocol}}"
+ "CONFIG_whisk_activationStore_elasticsearch_hosts": "{{ elasticsearch_connect_string }}"
+ "CONFIG_whisk_activationStore_elasticsearch_indexPattern": "{{ db.elasticsearch.index_pattern }}"
+ "CONFIG_whisk_activationStore_elasticsearch_username": "{{ db.elasticsearch.auth.admin.username }}"
+ "CONFIG_whisk_activationStore_elasticsearch_password": "{{ db.elasticsearch.auth.admin.password }}"
+ "CONFIG_whisk_spi_ActivationStoreProvider": "org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStoreProvider"
+ when: db.activation_store.backend == "ElasticSearch"
+
+- name: merge elasticsearch activation store env
+ set_fact:
+ env: "{{ env | combine(elastic_env) }}"
+ when: db.activation_store.backend == "ElasticSearch"
+
- name: populate volumes for controller
set_fact:
controller_volumes:
diff --git a/ansible/roles/elasticsearch/tasks/clean.yml b/ansible/roles/elasticsearch/tasks/clean.yml
new file mode 100644
index 0000000..21109b7
--- /dev/null
+++ b/ansible/roles/elasticsearch/tasks/clean.yml
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+---
+# Remove ElasticSearch server
+
+- name: set elasticsearch container name and volume
+ set_fact:
+ elasticsearch_name: "{{ name_prefix ~ host_group.index(inventory_hostname) }}"
+ volume_name: "{{ db.elasticsearch.base_volume ~ host_group.index(inventory_hostname) }}"
+
+- name: remove ElasticSearch
+ vars:
+ elasticsearch_image: "{{ elasticsearch.docker_image | default('docker.elastic.co/elasticsearch/elasticsearch:' ~ elasticsearch.version ) }}"
+ docker_container:
+ name: "{{ elasticsearch_name }}"
+ image: "{{ elasticsearch_image }}"
+ keep_volumes: False
+ state: absent
+ ignore_errors: True
+
+- name: remove ElasticSearch conf dir
+ file:
+ path: "{{ db.elasticsearch.confdir }}/{{ elasticsearch_name }}"
+ state: absent
+ become: "{{ db.elasticsearch.dir.become }}"
diff --git a/ansible/roles/elasticsearch/tasks/deploy.yml b/ansible/roles/elasticsearch/tasks/deploy.yml
new file mode 100644
index 0000000..3728377
--- /dev/null
+++ b/ansible/roles/elasticsearch/tasks/deploy.yml
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+---
+# This role will run a ElasticSearch server on the db group
+
+- name: set the vm.max_map_count to 262144
+ sysctl:
+ name: vm.max_map_count
+ value: '262144'
+ become: true
+
+- name: set elasticsearch container name, volume and port
+ set_fact:
+ elasticsearch_name: "{{ name_prefix ~ host_group.index(inventory_hostname) }}"
+ volume_name: "{{ db.elasticsearch.base_volume ~ host_group.index(inventory_hostname) }}"
+ http_port: "{{ (db.elasticsearch.port|int) + host_group.index(inventory_hostname) }}"
+ transport_port: "{{ (db.elasticsearch.base_transport_port|int) + host_group.index(inventory_hostname) }}"
+
+- name: ensure elasticserach config directory is created with permissions
+ file:
+ path: "{{ db.elasticsearch.confdir }}/{{ elasticsearch_name }}"
+ state: directory
+ mode: 0755
+ become: "{{ db.elasticsearch.dir.become }}"
+
+# create volume direcotry if it's a directory path(not a named volume)
+- name: ensure elasticserach volume directory is created with permissions
+ file:
+ path: "{{ volume_name }}"
+ state: directory
+ mode: 0700
+ owner: "{{ db.elasticsearch.uid }}"
+ become: true
+ when: volume_name is search("/")
+
+- name: copy elasticsearch config file
+ template:
+ src: "elasticsearch.yml.j2"
+ dest: "{{ db.elasticsearch.confdir }}/{{ elasticsearch_name }}/elasticsearch.yml"
+ mode: 0644
+ become: "{{ db.elasticsearch.dir.become }}"
+
+- name: copy elasticsearch log config file
+ template:
+ src: "log4j2.properties.j2"
+ dest: "{{ db.elasticsearch.confdir }}/{{ elasticsearch_name }}/log4j2.properties"
+ mode: 0644
+ become: "{{ db.elasticsearch.dir.become }}"
+
+- name: "(re)start ElasticSearch from '{{ elasticsearch_image }} ' "
+ vars:
+ elasticsearch_image: "{{ elasticsearch.docker_image | default('docker.elastic.co/elasticsearch/elasticsearch:' ~ elasticsearch.version ) }}"
+ docker_container:
+ name: "{{ elasticsearch_name }}"
+ image: "{{ elasticsearch_image }}"
+ state: started
+ recreate: true
+ restart_policy: "{{ docker.restart.policy }}"
+ ports:
+ - "{{ http_port }}:9200"
+ - "{{ transport_port }}:{{ transport_port }}"
+ volumes:
+ - "{{ db.elasticsearch.confdir }}/{{ elasticsearch_name }}/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml"
+ - "{{ db.elasticsearch.confdir }}/{{ elasticsearch_name }}/log4j2.properties:/usr/share/elasticsearch/config/log4j2.properties"
+ - "{{ volume_name }}:/usr/share/elasticsearch/data"
+ pull: "{{ docker.pull_elasticsearch | default(true) }}"
+ ulimits:
+ - "nofile:262144:262144"
+ - "memlock:-1:-1"
+ env:
+ TZ: "{{ docker.timezone }}"
+ ES_JAVA_OPTS: "{{ db.elasticsearch.java_opts }}"
+
+- name: wait until ElasticSearch in this host is up and running
+ uri:
+ url: "{{ db.elasticsearch.protocol }}://{{ ansible_host }}:{{ http_port }}"
+ register: result
+ until: result.status == 200
+ retries: 12
+ delay: 5
diff --git a/tools/travis/runUnitTests.sh b/ansible/roles/elasticsearch/tasks/main.yml
old mode 100755
new mode 100644
similarity index 70%
copy from tools/travis/runUnitTests.sh
copy to ansible/roles/elasticsearch/tasks/main.yml
index 762aea6..e5cc947
--- a/tools/travis/runUnitTests.sh
+++ b/ansible/roles/elasticsearch/tasks/main.yml
@@ -1,5 +1,3 @@
-#!/usr/bin/env bash
-
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -16,21 +14,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+---
+# This role will deploy a database server. Use the role if you want to use ElasticSearch locally.
+# In deploy mode it will start the ElasticSearch container.
+# In clean mode it will remove the ElasticSearch container.
-set -e
-
-SCRIPTDIR=$(cd $(dirname "$0") && pwd)
-ROOTDIR="$SCRIPTDIR/../.."
-
-cd $ROOTDIR/tools/travis
-export ORG_GRADLE_PROJECT_testSetName="REQUIRE_ONLY_DB"
-
-./scan.sh
-
-./setupPrereq.sh
-
-cat "$ROOTDIR/tests/src/test/resources/application.conf"
-
-./distDocker.sh
+- import_tasks: deploy.yml
+ when: mode == "deploy"
-./runTests.sh
+- import_tasks: clean.yml
+ when: mode == "clean"
diff --git a/ansible/roles/elasticsearch/templates/elasticsearch.yml.j2 b/ansible/roles/elasticsearch/templates/elasticsearch.yml.j2
new file mode 100644
index 0000000..f3129a3
--- /dev/null
+++ b/ansible/roles/elasticsearch/templates/elasticsearch.yml.j2
@@ -0,0 +1,23 @@
+cluster.name: "{{ db.elasticsearch.cluster_name }}"
+node.name: "{{ elasticsearch_name }}"
+network.host: 0.0.0.0
+network.publish_host: {{ ansible_default_ipv4.address }}
+
+http.port: 9200
+transport.tcp.port: {{ transport_port }}
+
+# minimum_master_nodes need to be explicitly set when bound on a public IP
+# set to 1 to allow single node clusters
+# Details: https://github.com/elastic/elasticsearch/pull/17282
+discovery.zen.ping.unicast.hosts:
+{% for es in groups['elasticsearch'] %}
+ - {{ hostvars[es].ansible_host }}:{{ db.elasticsearch.base_transport_port + host_group.index(es)|int }}
+{% endfor %}
+discovery.zen.minimum_master_nodes: {{ (host_group|length / 2 + 1) | int}}
+
+gateway.recover_after_nodes: {{ (host_group|length / 2 + 1) | int }}
+gateway.expected_nodes: {{ host_group|length }}
+gateway.recover_after_time: 5m
+
+xpack.security.enabled: false
+bootstrap.memory_lock: true
diff --git a/ansible/roles/elasticsearch/templates/log4j2.properties.j2 b/ansible/roles/elasticsearch/templates/log4j2.properties.j2
new file mode 100644
index 0000000..cc8edde
--- /dev/null
+++ b/ansible/roles/elasticsearch/templates/log4j2.properties.j2
@@ -0,0 +1,10 @@
+status = error
+
+appender.console.type = Console
+appender.console.name = console
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] %marker%m%n
+
+rootLogger.appenderRef.console.ref = console
+
+rootLogger.level = {{ db.elasticsearch.loglevel }}
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 596a202..62361e6 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -290,6 +290,22 @@
set_fact:
env: "{{ env | combine(invoker.extraEnv) }}"
+- name: setup elasticsearch activation store env
+ set_fact:
+ elastic_env:
+ "CONFIG_whisk_activationStore_elasticsearch_protocol": "{{ db.elasticsearch.protocol}}"
+ "CONFIG_whisk_activationStore_elasticsearch_hosts": "{{ elasticsearch_connect_string }}"
+ "CONFIG_whisk_activationStore_elasticsearch_indexPattern": "{{ db.elasticsearch.index_pattern }}"
+ "CONFIG_whisk_activationStore_elasticsearch_username": "{{ db.elasticsearch.auth.admin.username }}"
+ "CONFIG_whisk_activationStore_elasticsearch_password": "{{ db.elasticsearch.auth.admin.password }}"
+ "CONFIG_whisk_spi_ActivationStoreProvider": "org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStoreProvider"
+ when: db.activation_store.backend == "ElasticSearch"
+
+- name: merge elasticsearch activation store env
+ set_fact:
+ env: "{{ env | combine(elastic_env) }}"
+ when: db.activation_store.backend == "ElasticSearch"
+
- name: include plugins
include_tasks: "{{ inv_item }}.yml"
with_items: "{{ invoker_plugins | default([]) }}"
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 4f72527..4c434cc 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -86,6 +86,8 @@ dependencies {
compile "io.reactivex:rxjava-reactive-streams:1.2.1"
compile "com.microsoft.azure:azure-cosmosdb:2.6.2"
+ compile "com.sksamuel.elastic4s:elastic4s-http_${gradle.scala.depVersion}:6.7.4"
+
compile ("com.lightbend.akka:akka-stream-alpakka-s3_${gradle.scala.depVersion}:1.1.2") {
exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http
exclude group: 'com.fasterxml.jackson.core'
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 808241d..d8d0a3c 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -320,6 +320,21 @@ whisk {
# }
}
+ # ActivationStore related configuration
+ # For example:
+ # activationStore {
+ # elasticsearch {
+ # protocol = # "http" or "https"
+ # hosts = # the hosts address of ES, can be multi hosts combined with commas, like "172.17.0.1:9200,172.17.0.2:9200,172.17.0.3:9200"
+ # index-pattern = # the index pattern used to tell which index an activation should be stored to, will be calculated with activation namespace,
+ # for example, if the index-pattern is "openwhisk-%s", then activations under "whisk.system" will be saved in to index
+ # "openwhisk-whisk.system", you can also save all namespaces activations into one index by set index-pattern to a raw string
+ # like "openwhisk"
+ # username = # username of the provide ES
+ # password = # password of the provide ES
+ # }
+ # }
+
# transaction ID related configuration
transactions {
header = "X-Request-ID"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 62a4c98..51ac8f9 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -257,6 +257,7 @@ object ConfigKeys {
val controllerActivation = s"$controller.activation"
val activationStore = "whisk.activation-store"
+ val elasticSearchActivationStore = s"$activationStore.elasticsearch"
val activationStoreWithFileStorage = s"$activationStore.with-file-storage"
val metrics = "whisk.metrics"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
new file mode 100644
index 0000000..5d97688
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.elasticsearch
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import scala.language.postfixOps
+import akka.actor.ActorSystem
+import akka.event.Logging.ErrorLevel
+import akka.http.scaladsl.model._
+import akka.stream.scaladsl.Flow
+import akka.stream._
+import com.sksamuel.elastic4s.http.search.SearchHit
+import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties, NoOpRequestConfigCallback}
+import com.sksamuel.elastic4s.indexes.IndexRequest
+import com.sksamuel.elastic4s.searches.queries.RangeQuery
+import com.sksamuel.elastic4s.searches.queries.matches.MatchPhrase
+import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
+import org.apache.http.impl.client.BasicCredentialsProvider
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
+import pureconfig.loadConfigOrThrow
+import pureconfig.generic.auto._
+import spray.json._
+import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId}
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.containerpool.logging.ElasticSearchJsonProtocol._
+import org.apache.openwhisk.core.database._
+import org.apache.openwhisk.core.database.StoreUtils._
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.Messages
+import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback
+
+import scala.concurrent.{ExecutionContextExecutor, Future, Promise}
+import scala.util.Try
+
+case class ElasticSearchActivationStoreConfig(protocol: String,
+ hosts: String,
+ indexPattern: String,
+ username: String,
+ password: String)
+
+class ElasticSearchActivationStore(
+ httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
+ elasticSearchConfig: ElasticSearchActivationStoreConfig =
+ loadConfigOrThrow[ElasticSearchActivationStoreConfig](ConfigKeys.elasticSearchActivationStore),
+ useBatching: Boolean = false)(implicit actorSystem: ActorSystem,
+ actorMaterializer: ActorMaterializer,
+ logging: Logging)
+ extends ActivationStore {
+
+ import com.sksamuel.elastic4s.http.ElasticDsl._
+
+ private implicit val executionContextExecutor: ExecutionContextExecutor = actorSystem.dispatcher
+
+ private val httpClientCallback = new HttpClientConfigCallback {
+ override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
+ val provider = new BasicCredentialsProvider
+ provider.setCredentials(
+ AuthScope.ANY,
+ new UsernamePasswordCredentials(elasticSearchConfig.username, elasticSearchConfig.password))
+ httpClientBuilder.setDefaultCredentialsProvider(provider)
+ }
+ }
+
+ private val client =
+ ElasticClient(
+ ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"),
+ NoOpRequestConfigCallback,
+ httpClientCallback)
+
+ private val esType = "_doc"
+ private val maxOpenDbRequests = actorSystem.settings.config
+ .getInt("akka.http.host-connection-pool.max-connections") / 2
+ private val batcher: Batcher[IndexRequest, Either[ArtifactStoreException, DocInfo]] =
+ new Batcher(500, maxOpenDbRequests)(doStore(_)(TransactionId.dbBatcher))
+
+ private val minStart = 0L
+ private val maxStart = Instant.now.toEpochMilli + TimeUnit.DAYS.toMillis(365 * 100) //100 years from now
+
+ override def store(activation: WhiskActivation, context: UserContext)(
+ implicit transid: TransactionId,
+ notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
+
+ val start =
+ transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] 'activations' document: '${activation.docid}'")
+
+ val path = activation.annotations
+ .getAs[String](WhiskActivation.pathAnnotation)
+ .getOrElse(s"${activation.namespace}/${activation.name}")
+ // Escape `_id` field as it's not permitted in ElasticSearch, add `path` field for search, and
+ // convert annotations to JsObject as ElasticSearch doesn't support array with mixed types
+ // response.result can be any type ElasticSearch also doesn't support that, so convert it to a string
+ val response = JsObject(
+ activation.response.toJsonObject.fields
+ .updated("result", JsString(activation.response.result.toJson.compactPrint)))
+ val payload = JsObject(
+ activation.toDocumentRecord.fields - "_id" ++ Map(
+ "path" -> JsString(path),
+ "@timestamp" -> JsString(activation.start.toString),
+ "annotations" -> activation.annotations.toJsObject,
+ "response" -> response))
+
+ val index = generateIndex(activation.namespace.namespace)
+ val op = indexInto(index, esType).doc(payload.toString).id(activation.docid.asString)
+
+ // always use batching
+ val res = batcher.put(op).map {
+ case Right(docInfo) =>
+ transid
+ .finished(this, start, s"[PUT] 'activations' completed document: '${activation.docid}', response: '$docInfo'")
+ docInfo
+ case Left(e: ArtifactStoreException) =>
+ transid.failed(
+ this,
+ start,
+ s"[PUT] 'activations' failed to put document: '${activation.docid}'; ${e.getMessage}.",
+ ErrorLevel)
+ throw PutException("error on 'put'")
+ }
+
+ reportFailure(res, start, failure => s"[PUT] 'activations' internal error, failure: '${failure.getMessage}'")
+ }
+
+ private def doStore(ops: Seq[IndexRequest])(
+ implicit transid: TransactionId): Future[Seq[Either[ArtifactStoreException, DocInfo]]] = {
+ val count = ops.size
+ val start = transid.started(this, LoggingMarkers.DATABASE_BULK_SAVE, s"'activations' saving $count documents")
+ val res = client
+ .execute {
+ bulk(ops)
+ }
+ .map { res =>
+ if (res.status == StatusCodes.OK.intValue || res.status == StatusCodes.Created.intValue) {
+ res.result.items.map { bulkRes =>
+ if (bulkRes.status == StatusCodes.OK.intValue || bulkRes.status == StatusCodes.Created.intValue)
+ Right(DocInfo(bulkRes.id))
+ else
+ Left(PutException(
+ s"Unexpected error: ${bulkRes.error.map(e => s"${e.`type`}:${e.reason}").getOrElse("unknown")}, code: ${bulkRes.status} on 'bulk_put'"))
+ }
+ } else {
+ transid.failed(
+ this,
+ start,
+ s"'activations' failed to put documents, http status: '${res.status}'",
+ ErrorLevel)
+ throw PutException("Unexpected http response code: " + res.status)
+ }
+ }
+
+ reportFailure(res, start, failure => s"[PUT] 'activations' internal error, failure: '${failure.getMessage}'")
+ }
+
+ override def get(activationId: ActivationId, context: UserContext)(
+ implicit transid: TransactionId): Future[WhiskActivation] = {
+
+ val start =
+ transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET] 'activations' finding activation: '$activationId'")
+
+ val index = generateIndex(extractNamespace(activationId))
+ val res = client
+ .execute {
+ search(index) query { termQuery("_id", activationId.asString) }
+ }
+ .map { res =>
+ if (res.status == StatusCodes.OK.intValue) {
+ if (res.result.hits.total == 0) {
+ transid.finished(this, start, s"[GET] 'activations', document: '$activationId'; not found.")
+ throw NoDocumentException("not found on 'get'")
+ } else {
+ transid.finished(this, start, s"[GET] 'activations' completed: found activation '$activationId'")
+ deserializeHitToWhiskActivation(res.result.hits.hits(0))
+ }
+ } else if (res.status == StatusCodes.NotFound.intValue) {
+ transid.finished(this, start, s"[GET] 'activations', document: '$activationId'; not found.")
+ throw NoDocumentException("not found on 'get'")
+ } else {
+ transid
+ .finished(
+ this,
+ start,
+ s"[GET] 'activations' failed to get document: '$activationId'; http status: '${res.status}'")
+ throw GetException("Unexpected http response code: " + res.status)
+ }
+ } recoverWith {
+ case _: DeserializationException => throw DocumentUnreadable(Messages.corruptedEntity)
+ }
+
+ reportFailure(
+ res,
+ start,
+ failure => s"[GET] 'activations' internal error, doc: '$activationId', failure: '${failure.getMessage}'")
+ }
+
+ override def delete(activationId: ActivationId, context: UserContext)(
+ implicit transid: TransactionId,
+ notifier: Option[CacheChangeNotification]): Future[Boolean] = {
+
+ val start =
+ transid.started(this, LoggingMarkers.DATABASE_DELETE, s"[DEL] 'activations' deleting document: '$activationId'")
+
+ val index = generateIndex(extractNamespace(activationId))
+
+ val res = client
+ .execute {
+ deleteByQuery(index, esType, termQuery("_id", activationId.asString))
+ }
+ .map { res =>
+ if (res.status == StatusCodes.OK.intValue) {
+ if (res.result.deleted == 0) {
+ transid.finished(this, start, s"[DEL] 'activations', document: '$activationId'; not found.")
+ throw NoDocumentException("not found on 'delete'")
+ } else {
+ transid
+ .finished(
+ this,
+ start,
+ s"[DEL] 'activations' completed document: '$activationId', response: ${res.result}")
+ true
+ }
+ } else if (res.status == StatusCodes.NotFound.intValue) {
+ transid.finished(this, start, s"[DEL] 'activations', document: '$activationId'; not found.")
+ throw NoDocumentException("not found on 'delete'")
+ } else {
+ transid.failed(
+ this,
+ start,
+ s"[DEL] 'activations' failed to delete document: '$activationId'; http status: '${res.status}'",
+ ErrorLevel)
+ throw DeleteException("Unexpected http response code: " + res.status)
+ }
+ }
+
+ reportFailure(
+ res,
+ start,
+ failure => s"[DEL] 'activations' internal error, doc: '$activationId', failure: '${failure.getMessage}'")
+ }
+
+ override def countActivationsInNamespace(namespace: EntityPath,
+ name: Option[EntityPath] = None,
+ skip: Int,
+ since: Option[Instant] = None,
+ upto: Option[Instant] = None,
+ context: UserContext)(implicit transid: TransactionId): Future[JsObject] = {
+ require(skip >= 0, "skip should be non negative")
+ val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[COUNT] 'activations'")
+
+ val nameQuery = name
+ .map { path =>
+ matchPhraseQuery("path", namespace.addPath(path).asString)
+ }
+ .getOrElse {
+ matchPhraseQuery("namespace", namespace.asString)
+ }
+ val startRange = generateRangeQuery("start", since, upto)
+
+ val index = generateIndex(namespace.namespace)
+
+ val res = client
+ .execute {
+ count(index) query { must(nameQuery, startRange) }
+ }
+ .map { res =>
+ if (res.status == StatusCodes.OK.intValue) {
+ val out = if (res.result.count > skip) res.result.count - skip else 0L
+ transid.finished(this, start, s"[COUNT] 'activations' completed: count $out")
+ JsObject(WhiskActivation.collectionName -> JsNumber(out))
+ } else {
+ transid.failed(this, start, s"Unexpected http response code: ${res.status}", ErrorLevel)
+ throw QueryException("Unexpected http response code: " + res.status)
+ }
+ }
+
+ reportFailure(res, start, failure => s"[COUNT] 'activations' internal error, failure: '${failure.getMessage}'")
+ }
+
+ override def listActivationsMatchingName(
+ namespace: EntityPath,
+ name: EntityPath,
+ skip: Int,
+ limit: Int,
+ includeDocs: Boolean = false,
+ since: Option[Instant] = None,
+ upto: Option[Instant] = None,
+ context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = {
+
+ val nameQuery = matchPhraseQuery("path", namespace.addPath(name).asString)
+ listActivations(namespace, skip, limit, nameQuery, includeDocs, since, upto, context)
+ }
+
+ override def listActivationsInNamespace(
+ namespace: EntityPath,
+ skip: Int,
+ limit: Int,
+ includeDocs: Boolean = false,
+ since: Option[Instant] = None,
+ upto: Option[Instant] = None,
+ context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = {
+
+ val nameQuery = matchPhraseQuery("namespace", namespace.asString)
+ listActivations(namespace, skip, limit, nameQuery, includeDocs, since, upto, context)
+ }
+
+ private def listActivations(
+ namespace: EntityPath,
+ skip: Int,
+ limit: Int,
+ nameQuery: MatchPhrase,
+ includeDocs: Boolean = false,
+ since: Option[Instant] = None,
+ upto: Option[Instant] = None,
+ context: UserContext)(implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = {
+
+ require(skip >= 0, "skip should be non negative")
+ require(limit >= 0, "limit should be non negative")
+
+ val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[QUERY] 'activations'")
+ val startRange = generateRangeQuery("start", since, upto)
+ val index = generateIndex(namespace.namespace)
+
+ val res = client
+ .execute {
+ search(index) query { must(nameQuery, startRange) } sortByFieldDesc "start" limit limit from skip
+ }
+ .map { res =>
+ if (res.status == StatusCodes.OK.intValue) {
+ val out =
+ if (includeDocs)
+ Right(res.result.hits.hits.map(deserializeHitToWhiskActivation).toList)
+ else
+ Left(res.result.hits.hits.map(deserializeHitToWhiskActivation(_).summaryAsJson).toList)
+ transid.finished(this, start, s"[QUERY] 'activations' completed: matched ${res.result.hits.total}")
+ out
+
+ } else {
+ transid.failed(this, start, s"Unexpected http response code: ${res.status}", ErrorLevel)
+ throw QueryException("Unexpected http response code: " + res.status)
+ }
+ }
+
+ reportFailure(res, start, failure => s"failed to query activation with error ${failure.getMessage}")
+ }
+
+ private def deserializeHitToWhiskActivation(hit: SearchHit): WhiskActivation = {
+ restoreAnnotations(restoreResponse(hit.sourceAsString.parseJson.asJsObject)).convertTo[WhiskActivation]
+ }
+
+ private def restoreAnnotations(js: JsObject): JsObject = {
+ val annotations = js.fields
+ .get("annotations")
+ .map { anno =>
+ Try {
+ JsArray(anno.asJsObject.fields map { p =>
+ JsObject("key" -> JsString(p._1), "value" -> p._2)
+ } toSeq: _*)
+ }.getOrElse(JsArray.empty)
+ }
+ .getOrElse(JsArray.empty)
+ JsObject(js.fields.updated("annotations", annotations))
+ }
+
+ private def restoreResponse(js: JsObject): JsObject = {
+ val response = js.fields
+ .get("response")
+ .map { res =>
+ val temp = res.asJsObject.fields
+ Try {
+ val result = temp
+ .get("result")
+ .map { r =>
+ val JsString(data) = r
+ data.parseJson.asJsObject
+ }
+ .getOrElse(JsObject.empty)
+ JsObject(temp.updated("result", result))
+ }.getOrElse(JsObject(temp - "result"))
+ }
+ .getOrElse(JsObject.empty)
+ JsObject(js.fields.updated("response", response))
+ }
+
+ private def extractNamespace(activationId: ActivationId): String = {
+ activationId.toString.split("/")(0)
+ }
+
+ private def generateIndex(namespace: String): String = {
+ elasticSearchConfig.indexPattern.dropWhile(_ == '/') format namespace.toLowerCase
+ }
+
+ private def generateRangeQuery(key: String, since: Option[Instant], upto: Option[Instant]): RangeQuery = {
+ rangeQuery(key)
+ .gte(since.map(_.toEpochMilli).getOrElse(minStart))
+ .lte(upto.map(_.toEpochMilli).getOrElse(maxStart))
+ }
+}
+
+object ElasticSearchActivationStoreProvider extends ActivationStoreProvider {
+ override def instance(actorSystem: ActorSystem, actorMaterializer: ActorMaterializer, logging: Logging) =
+ new ElasticSearchActivationStore(useBatching = true)(actorSystem, actorMaterializer, logging)
+}
diff --git a/tests/build.gradle b/tests/build.gradle
index fa9bff7..3234b0d 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -213,6 +213,7 @@ dependencies {
compile "io.fabric8:kubernetes-server-mock:${gradle.kube_client.version}"
compile "com.amazonaws:aws-java-sdk-s3:1.11.295"
+ compile "org.testcontainers:elasticsearch:1.12.3"
compile project(':common:scala')
compile project(':core:controller')
diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2
index e6a10df..98716e8 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -92,6 +92,10 @@ whisk {
url = "{{ user_images_registry | default('') }}"
}
}
+
+ elasticsearch {
+ docker-image = "{{ elasticsearch.docker_image | default('docker.elastic.co/elasticsearch/elasticsearch:' ~ elasticsearch.version ) }}"
+ }
}
#test-only overrides so that tests can override defaults in application.conf (todo: move all defaults to reference.conf)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStoreBehaviorBase.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStoreBehaviorBase.scala
new file mode 100644
index 0000000..a13ac8c
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStoreBehaviorBase.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.elasticsearch
+
+import org.scalatest.FlatSpec
+import org.apache.openwhisk.core.controller.test.WhiskAuthHelpers
+import org.apache.openwhisk.core.database.UserContext
+import org.apache.openwhisk.core.database.test.behavior.ActivationStoreBehaviorBase
+import org.apache.openwhisk.core.entity.{ActivationResponse, Parameters, WhiskActivation}
+import org.testcontainers.elasticsearch.ElasticsearchContainer
+import pureconfig.loadConfigOrThrow
+import spray.json.{JsObject, JsString}
+
+trait ElasticSearchActivationStoreBehaviorBase extends FlatSpec with ActivationStoreBehaviorBase {
+ val imageName = loadConfigOrThrow[String]("whisk.elasticsearch.docker-image")
+ val container = new ElasticsearchContainer(imageName)
+ container.start()
+
+ override def afterAll = {
+ container.close()
+ super.afterAll()
+ }
+
+ override def storeType = "ElasticSearch"
+
+ val creds = WhiskAuthHelpers.newIdentity()
+ override val context = UserContext(creds)
+
+ override lazy val activationStore = {
+ val storeConfig =
+ ElasticSearchActivationStoreConfig("http", container.getHttpHostAddress, "unittest-%s", "fake", "fake")
+ new ElasticSearchActivationStore(None, storeConfig, true)
+ }
+
+ // add result and annotations
+ override def newActivation(ns: String, actionName: String, start: Long): WhiskActivation = {
+ super
+ .newActivation(ns, actionName, start)
+ .copy(
+ response = ActivationResponse.success(Some(JsObject("name" -> JsString("whisker")))),
+ annotations = Parameters("database", "elasticsearch") ++ Parameters("type", "test"))
+ }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStoreTests.scala
new file mode 100644
index 0000000..6fcc8db
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStoreTests.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.elasticsearch
+
+import java.time.Instant
+
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.database.UserContext
+import org.apache.openwhisk.core.database.test.behavior.ActivationStoreBehavior
+import org.apache.openwhisk.core.entity.{EntityPath, WhiskActivation}
+import org.apache.openwhisk.utils.retry
+
+@RunWith(classOf[JUnitRunner])
+class ElasticSearchActivationStoreTests
+ extends FlatSpec
+ with ElasticSearchActivationStoreBehaviorBase
+ with ActivationStoreBehavior {
+
+ override def checkGetActivation(activation: WhiskActivation)(implicit transid: TransactionId): Unit = {
+ retry(super.checkGetActivation(activation), 10)
+ }
+
+ override def checkDeleteActivation(activation: WhiskActivation)(implicit transid: TransactionId): Unit = {
+ retry(super.checkDeleteActivation(activation), 10)
+ }
+
+ override def checkQueryActivations(namespace: String,
+ name: Option[String] = None,
+ skip: Int = 0,
+ limit: Int = 1000,
+ includeDocs: Boolean = false,
+ since: Option[Instant] = None,
+ upto: Option[Instant] = None,
+ context: UserContext,
+ expected: IndexedSeq[WhiskActivation])(implicit transid: TransactionId): Unit = {
+ retry(super.checkQueryActivations(namespace, name, skip, limit, includeDocs, since, upto, context, expected), 10)
+ }
+
+ override def checkCountActivations(namespace: String,
+ name: Option[EntityPath] = None,
+ skip: Int = 0,
+ since: Option[Instant] = None,
+ upto: Option[Instant] = None,
+ context: UserContext,
+ expected: Long)(implicit transid: TransactionId): Unit = {
+ retry(super.checkCountActivations(namespace, name, skip, since, upto, context, expected), 10)
+ }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ActivationStoreCRUDBehaviors.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ActivationStoreCRUDBehaviors.scala
index 506ccec..8427527 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ActivationStoreCRUDBehaviors.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ActivationStoreCRUDBehaviors.scala
@@ -19,12 +19,24 @@ package org.apache.openwhisk.core.database.test.behavior
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.database.NoDocumentException
-import org.apache.openwhisk.core.entity.ActivationId
+import org.apache.openwhisk.core.entity.{ActivationId, WhiskActivation}
import scala.util.Random
trait ActivationStoreCRUDBehaviors extends ActivationStoreBehaviorBase {
+ protected def checkStoreActivation(activation: WhiskActivation)(implicit transid: TransactionId): Unit = {
+ store(activation, context) shouldBe activation.docinfo
+ }
+
+ protected def checkDeleteActivation(activation: WhiskActivation)(implicit transid: TransactionId): Unit = {
+ activationStore.delete(ActivationId(activation.docid.asString), context).futureValue shouldBe true
+ }
+
+ protected def checkGetActivation(activation: WhiskActivation)(implicit transid: TransactionId): Unit = {
+ activationStore.get(ActivationId(activation.docid.asString), context).futureValue shouldBe activation
+ }
+
behavior of s"${storeType}ActivationStore store"
it should "put activation and get docinfo" in {
@@ -32,8 +44,7 @@ trait ActivationStoreCRUDBehaviors extends ActivationStoreBehaviorBase {
val namespace = s"ns_${Random.alphanumeric.take(4).mkString}"
val action = s"action1_${Random.alphanumeric.take(4).mkString}"
val activation = newActivation(namespace, action, 1L)
- val doc = store(activation, context)
- doc shouldBe activation.docinfo
+ checkStoreActivation(activation)
}
behavior of s"${storeType}ActivationStore delete"
@@ -44,7 +55,7 @@ trait ActivationStoreCRUDBehaviors extends ActivationStoreBehaviorBase {
val action = s"action1_${Random.alphanumeric.take(4).mkString}"
val activation = newActivation(namespace, action, 1L)
store(activation, context)
- activationStore.delete(ActivationId(activation.docid.asString), context).futureValue shouldBe true
+ checkDeleteActivation(activation)
}
it should "throws NoDocumentException when activation does not exist" in {
@@ -60,7 +71,7 @@ trait ActivationStoreCRUDBehaviors extends ActivationStoreBehaviorBase {
val action = s"action1_${Random.alphanumeric.take(4).mkString}"
val activation = newActivation(namespace, action, 1L)
store(activation, context)
- activationStore.get(ActivationId(activation.docid.asString), context).futureValue shouldBe activation
+ checkGetActivation(activation)
}
it should "throws NoDocumentException when activation does not exist" in {
diff --git a/tools/build/README.md b/tools/build/README.md
index fcbd356..0282482 100644
--- a/tools/build/README.md
+++ b/tools/build/README.md
@@ -30,6 +30,7 @@ The script is called `redo` because for most development, one will want to "redo
- usage information: `redo -h`
- initialize environment and `docker-machine` (for mac): `redo setup prereq`
- start CouchDB container and initialize DB with system and guest keys: `redo couchdb initdb`
+- start ElasticSearch container to store activations: `redo elasticsearch`
- build and deploy system: `redo deploy`
- run tests: `redo props tests`
diff --git a/tools/build/redo b/tools/build/redo
index 0534409..9ffa04e 100755
--- a/tools/build/redo
+++ b/tools/build/redo
@@ -237,6 +237,10 @@ Components = [
'recreate main db for entities',
yaml = 'wipe.yml'),
+ makeComponent('elasticsearch',
+ 'deploy elasticsearch',
+ modes = 'clean'),
+
makeComponent('build',
'build system',
yaml = False,
diff --git a/tools/travis/runUnitTests.sh b/tools/travis/runUnitTests.sh
index 762aea6..e2ef4ac 100755
--- a/tools/travis/runUnitTests.sh
+++ b/tools/travis/runUnitTests.sh
@@ -23,6 +23,7 @@ SCRIPTDIR=$(cd $(dirname "$0") && pwd)
ROOTDIR="$SCRIPTDIR/../.."
cd $ROOTDIR/tools/travis
+export TESTCONTAINERS_RYUK_DISABLED="true"
export ORG_GRADLE_PROJECT_testSetName="REQUIRE_ONLY_DB"
./scan.sh