You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2017/11/29 12:49:41 UTC

[GitHub] cbickel closed pull request #2810: Add couchdb clustering

cbickel closed pull request #2810: Add couchdb clustering
URL: https://github.com/apache/incubator-openwhisk/pull/2810
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index f1a746d5b1..5b2ea446f2 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -213,6 +213,7 @@ nginx:
 # The key db.whisk.auth is the name of the authentication database where all keys of all users are stored.
 # The db_prefix is defined for each environment on its own. The CouchDb credentials are also defined for each environment on its own.
 db:
+  instances: "{{ groups['db'] | length }}"
   authkeys:
   - guest
   - whisk.system
@@ -239,7 +240,7 @@ linux:
   version: 4.4.0-31
 
 couchdb:
-  version: 2.0
+  version: 2.1
 
 docker:
   # The user to install docker for. Defaults to the ansible user if not set. This will be the user who is able to run
diff --git a/ansible/roles/couchdb/tasks/deploy.yml b/ansible/roles/couchdb/tasks/deploy.yml
index 27de14829b..88ec1eb50b 100644
--- a/ansible/roles/couchdb/tasks/deploy.yml
+++ b/ansible/roles/couchdb/tasks/deploy.yml
@@ -1,9 +1,9 @@
 ---
 # This role will run a CouchDB server on the db group
 
-- name: "Set node name to couchdb{{ groups['db'].index(inventory_hostname) }}"
+- name: "Set the coordinator to the first node"
   set_fact:
-    nodeName: "couchdb{{ groups['db'].index(inventory_hostname) }}"
+    coordinator: "{{ groups['db'][0] }}"
 
 - name: check if db credentials are valid for CouchDB
   fail: msg="The db provider in your {{ inventory_dir }}/group_vars/all is {{ db_provider }}, it has to be CouchDB, pls double check"
@@ -21,25 +21,27 @@
     volume_dir: "{{ instance.volume.fsmount | default( '/mnt/' + group_names|first, true ) }}:/usr/local/var/lib/couchdb"
   when: (block_device is defined) and (block_device in disk_status.stdout)
 
-- name: "pull the klaemo/couchdb:{{ couchdb.version }} image"
-  shell: "docker pull klaemo/couchdb:{{ couchdb.version }}"
+- name: "pull the apache/couchdb:{{ couchdb.version }} image"
+  shell: "docker pull apache/couchdb:{{ couchdb.version }}"
   retries: "{{ docker.pull.retries }}"
   delay: "{{ docker.pull.delay }}"
 
 - name: (re)start CouchDB
   docker_container:
     name: couchdb
-    image: klaemo/couchdb:{{ couchdb.version }}
+    image: apache/couchdb:{{ couchdb.version }}
     state: started
     recreate: true
     restart_policy: "{{ docker.restart.policy }}"
     volumes: "{{volume_dir | default([])}}"
     ports:
       - "{{ db_port }}:5984"
+      - "4369:4369"
+      - "9100:9100"
     env:
       COUCHDB_USER: "{{ db_username }}"
       COUCHDB_PASSWORD: "{{ db_password }}"
-      NODENAME: "{{ nodeName }}"
+      NODENAME: "{{ ansible_host }}"
 
 - name: wait until CouchDB in this host is up and running
   uri:
@@ -49,9 +51,48 @@
   retries: 12
   delay: 5
 
+- name: enable the cluster setup mode
+  uri:
+    url: "{{ db_protocol }}://{{ ansible_host }}:{{ db_port }}/_cluster_setup"
+    method: POST
+    body: >
+        {"action": "enable_cluster", "bind_address":"0.0.0.0", "username": "{{ db_username }}", "password":"{{ db_password }}", "port": {{ db_port }}, "node_count": "{{ groups['db'] | length }}", "remote_node": "{{ ansible_host }}", "remote_current_user": "{{ db_username }}", "remote_current_password": "{{ db_password }}"}
+    body_format: json
+    status_code: 201
+    user: "{{ db_username }}"
+    password: "{{ db_password }}"
+    force_basic_auth: yes
+  when: inventory_hostname == coordinator
+
+- name: add remote nodes to the cluster
+  uri:
+    url: "{{ db_protocol }}://{{ coordinator }}:{{ db_port }}/_cluster_setup"
+    method: POST
+    body: >
+        {"action": "add_node", "host":"{{ ansible_host }}", "port": {{ db_port }}, "username": "{{ db_username }}", "password":"{{ db_password }}"}
+    body_format: json
+    status_code: 201
+    user: "{{ db_username }}"
+    password: "{{ db_password }}"
+    force_basic_auth: yes
+  when: inventory_hostname != coordinator
+
+- name: finish the cluster setup mode
+  uri:
+    url: "{{ db_protocol }}://{{ ansible_host }}:{{ db_port }}/_cluster_setup"
+    method: POST
+    body: >
+        {"action": "finish_cluster"}
+    body_format: json
+    status_code: 201
+    user: "{{ db_username }}"
+    password: "{{ db_password }}"
+    force_basic_auth: yes
+  when: inventory_hostname == coordinator
+
 - name: disable reduce limit on views
   uri:
-    url: "{{ db_protocol }}://{{ ansible_host }}:{{ db_port }}/_node/couchdb@{{ nodeName }}/_config/query_server_config/reduce_limit"
+    url: "{{ db_protocol }}://{{ ansible_host }}:{{ db_port }}/_node/couchdb@{{ ansible_host }}/_config/query_server_config/reduce_limit"
     method: PUT
     body: >
         "false"
@@ -60,3 +101,4 @@
     user: "{{ db_username }}"
     password: "{{ db_password }}"
     force_basic_auth: yes
+  when: inventory_hostname == coordinator
diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2
index 8ae7b8c0b0..925c246eec 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -92,6 +92,8 @@ db.whisk.activations={{ db.whisk.activations }}
 db.whisk.actions.ddoc={{ db.whisk.actions_ddoc }}
 db.whisk.activations.ddoc={{ db.whisk.activations_ddoc }}
 db.whisk.activations.filter.ddoc={{ db.whisk.activations_filter_ddoc }}
+db.hostsList={{ groups["db"] | map('extract', hostvars, 'ansible_host') | list | join(",") }}
+db.instances={{ db.instances }}
 
 apigw.auth.user={{apigw_auth_user}}
 apigw.auth.pwd={{apigw_auth_pwd}}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index c08fcf13f1..36c17afe0f 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -94,7 +94,6 @@ class WhiskConfig(requiredProperties: Map[String, String],
   val dbAuths = this(WhiskConfig.dbAuths)
   val dbWhisk = this(WhiskConfig.dbWhisk)
   val dbActivations = this(WhiskConfig.dbActivations)
-
   val mainDockerEndpoint = this(WhiskConfig.mainDockerEndpoint)
 
   val kafkaTopicsInvokerRetentionBytes = this(WhiskConfig.kafkaTopicsInvokerRetentionBytes)
@@ -222,6 +221,7 @@ object WhiskConfig {
 
   val controllerBlackboxFraction = "controller.blackboxFraction"
   val controllerInstances = "controller.instances"
+  val dbInstances = "db.instances"
 
   val loadbalancerInvokerBusyThreshold = "loadbalancer.invokerBusyThreshold"
 
@@ -233,6 +233,7 @@ object WhiskConfig {
   val redisHostPort = "redis.host.port"
 
   val invokerHostsList = "invoker.hosts"
+  val dbHostsList = "db.hostsList"
 
   val edgeHost = Map(edgeHostName -> null, edgeHostApiPort -> null)
   val invokerHosts = Map(invokerHostsList -> null)
diff --git a/tests/src/test/scala/common/WhiskProperties.java b/tests/src/test/scala/common/WhiskProperties.java
index 0971b23f91..b770c1ba74 100644
--- a/tests/src/test/scala/common/WhiskProperties.java
+++ b/tests/src/test/scala/common/WhiskProperties.java
@@ -17,8 +17,6 @@
 
 package common;
 
-import static org.junit.Assert.assertTrue;
-
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -26,6 +24,8 @@
 import java.nio.file.Files;
 import java.util.Properties;
 
+import static org.junit.Assert.assertTrue;
+
 /**
  * Properties that describe a whisk installation
  */
@@ -231,6 +231,14 @@ public static String getControllerHosts() {
         return whiskProperties.getProperty("controller.hosts");
     }
 
+    public static String getDBHosts() {
+        return whiskProperties.getProperty("db.hostsList");
+    }
+
+    public static int getDBPort() {
+        return Integer.parseInt(whiskProperties.getProperty("db.port"));
+    }
+
     public static int getControllerBasePort() {
         return Integer.parseInt(whiskProperties.getProperty("controller.host.basePort"));
     }
@@ -239,6 +247,10 @@ public static String getBaseControllerHost() {
         return getControllerHosts().split(",")[0];
     }
 
+    public static String getBaseDBHost() {
+        return getDBHosts().split(",")[0];
+    }
+
     public static String getBaseControllerAddress() {
         return getBaseControllerHost() + ":" + getControllerBasePort();
     }
diff --git a/tests/src/test/scala/ha/ShootComponentsTests.scala b/tests/src/test/scala/ha/ShootComponentsTests.scala
index 5fd1525936..c6e65dc8a5 100644
--- a/tests/src/test/scala/ha/ShootComponentsTests.scala
+++ b/tests/src/test/scala/ha/ShootComponentsTests.scala
@@ -19,32 +19,35 @@ package ha
 import java.io.File
 import java.time.Instant
 
-import scala.concurrent.Future
+import scala.concurrent.{Await, Future}
 import scala.concurrent.duration.DurationInt
 import scala.util.Try
-
 import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.junit.JUnitRunner
-
 import akka.http.scaladsl.Http
 import akka.http.scaladsl.model.HttpRequest
 import akka.http.scaladsl.model.StatusCodes
 import akka.http.scaladsl.unmarshalling.Unmarshal
 import akka.stream.ActorMaterializer
-import common.TestUtils
-import common.WhiskProperties
+import common._
 import common.rest.WskRest
-import common.WskActorSystem
-import common.WskProps
-import common.WskTestHelpers
+import spray.json._
+import spray.json.DefaultJsonProtocol._
 import whisk.core.WhiskConfig
+import whisk.core.database.test.ExtendedCouchDbRestClient
 import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
-class ShootComponentsTests extends FlatSpec with Matchers with WskTestHelpers with ScalaFutures with WskActorSystem {
+class ShootComponentsTests
+    extends FlatSpec
+    with Matchers
+    with WskTestHelpers
+    with ScalaFutures
+    with WskActorSystem
+    with StreamLogging {
 
   implicit val wskprops = WskProps()
   val wsk = new WskRest
@@ -63,18 +66,49 @@ class ShootComponentsTests extends FlatSpec with Matchers with WskTestHelpers wi
   val controller0DockerHost = WhiskProperties.getBaseControllerHost() + ":" + WhiskProperties.getProperty(
     WhiskConfig.dockerPort)
 
-  def restartComponent(host: String, component: String) = {
+  val couchDB0DockerHost = WhiskProperties.getBaseDBHost() + ":" + WhiskProperties.getProperty(WhiskConfig.dockerPort)
+
+  val dbProtocol = WhiskProperties.getProperty(WhiskConfig.dbProtocol)
+  val dbHostsList = WhiskProperties.getDBHosts
+  val dbPort = WhiskProperties.getProperty(WhiskConfig.dbPort)
+  val dbUsername = WhiskProperties.getProperty(WhiskConfig.dbUsername)
+  val dbPassword = WhiskProperties.getProperty(WhiskConfig.dbPassword)
+  val dbPrefix = WhiskProperties.getProperty(WhiskConfig.dbPrefix)
+  val dbWhiskAuth = WhiskProperties.getProperty(WhiskConfig.dbAuths)
+
+  private def getDockerCommand(host: String, component: String, cmd: String) = {
     def file(path: String) = Try(new File(path)).filter(_.exists).map(_.getAbsolutePath).toOption
+
     val docker = (file("/usr/bin/docker") orElse file("/usr/local/bin/docker")).getOrElse("docker")
 
-    val cmd = Seq(docker, "--host", host, "restart", component)
+    Seq(docker, "--host", host, cmd, component)
+  }
+
+  def restartComponent(host: String, component: String) = {
+    val cmd: Seq[String] = getDockerCommand(host, component, "restart")
+    println(s"Running command: ${cmd.mkString(" ")}")
+
+    TestUtils.runCmd(0, new File("."), cmd: _*)
+  }
+
+  def stopComponent(host: String, component: String) = {
+    val cmd: Seq[String] = getDockerCommand(host, component, "stop")
+    println(s"Running command: ${cmd.mkString(" ")}")
+
+    TestUtils.runCmd(0, new File("."), cmd: _*)
+  }
+
+  def startComponent(host: String, component: String) = {
+    val cmd: Seq[String] = getDockerCommand(host, component, "start")
     println(s"Running command: ${cmd.mkString(" ")}")
 
     TestUtils.runCmd(0, new File("."), cmd: _*)
   }
 
-  def ping(host: String, port: Int) = {
-    val response = Try { Http().singleRequest(HttpRequest(uri = s"http://$host:$port/ping")).futureValue }.toOption
+  def ping(host: String, port: Int, path: String = "/") = {
+    val response = Try {
+      Http().singleRequest(HttpRequest(uri = s"http://$host:$port$path")).futureValue
+    }.toOption
 
     response.map { res =>
       (res.status, Unmarshal(res).to[String].futureValue)
@@ -87,17 +121,34 @@ class ShootComponentsTests extends FlatSpec with Matchers with WskTestHelpers wi
     val host = WhiskProperties.getProperty("controller.hosts").split(",")(instance)
     val port = WhiskProperties.getControllerBasePort + instance
 
-    val res = ping(host, port)
+    val res = ping(host, port, "/ping")
     res == Some((StatusCodes.OK, "pong"))
   }
 
+  def isDBAlive(instance: Int): Boolean = {
+    require(instance >= 0 && instance < 2, "DB instance not known.")
+
+    val host = WhiskProperties.getProperty("db.hosts").split(",")(instance)
+    val port = WhiskProperties.getDBPort + instance
+
+    val res = ping(host, port)
+    res == Some(
+      (
+        StatusCodes.OK,
+        "{\"couchdb\":\"Welcome\",\"version\":\"2.1.1\",\"features\":[\"scheduler\"],\"vendor\":{\"name\":\"The Apache Software Foundation\"}}\n"))
+  }
+
   def doRequests(amount: Int, actionName: String): Seq[(Int, Int)] = {
     (0 until amount).map { i =>
       val start = Instant.now
 
       // Do POSTs and GETs
-      val invokeExit = Future { wsk.action.invoke(actionName, expectedExitCode = TestUtils.DONTCARE_EXIT).exitCode }
-      val getExit = Future { wsk.action.get(actionName, expectedExitCode = TestUtils.DONTCARE_EXIT).exitCode }
+      val invokeExit = Future {
+        wsk.action.invoke(actionName, expectedExitCode = TestUtils.DONTCARE_EXIT).exitCode
+      }
+      val getExit = Future {
+        wsk.action.get(actionName, expectedExitCode = TestUtils.DONTCARE_EXIT).exitCode
+      }
 
       println(s"Done rerquests with responses: invoke: ${invokeExit.futureValue} and get: ${getExit.futureValue}")
 
@@ -150,4 +201,119 @@ class ShootComponentsTests extends FlatSpec with Matchers with WskTestHelpers wi
       isControllerAlive(1) shouldBe true
     }
   }
+
+  behavior of "CouchDB HA"
+
+  it should "be able to retrieve documents from couchdb1 if couchdb0 goes down" in withAssetCleaner(wskprops) {
+    (wp, assetHelper) =>
+      if (WhiskProperties.getProperty(WhiskConfig.dbInstances).toInt >= 2) {
+
+        val dbName: String = dbWhiskAuth
+        val db1 = new ExtendedCouchDbRestClient(
+          dbProtocol,
+          dbHostsList.split(",")(0),
+          dbPort.toInt,
+          dbUsername,
+          dbPassword,
+          dbName)
+        val db2 = new ExtendedCouchDbRestClient(
+          dbProtocol,
+          dbHostsList.split(",")(1),
+          dbPort.toInt,
+          dbUsername,
+          dbPassword,
+          dbName)
+
+        println("Creating test document")
+        val docId = "couchdb-ha-test"
+        val testDocument = JsObject(
+          "_id" -> docId.toJson,
+          "namespaces" -> JsArray(
+            JsObject(
+              "name" -> docId.toJson,
+              "uuid" -> "789c46b1-71f6-4ed5-8c54-816aa4f8c502".toJson,
+              "key" -> "abczO3xZCLrMN6v2BKK1dXYFpXlPkccOFqm12CdAsMgRU4VrNZ9lyGVCGuMDGIwP".toJson)),
+          "subject" -> docId.toJson)
+
+        val docId2 = "couchdb-ha-test2"
+        val testDocument2 = JsObject(
+          "_id" -> docId2.toJson,
+          "namespaces" -> JsArray(
+            JsObject(
+              "name" -> docId2.toJson,
+              "uuid" -> "789c46b1-71f6-4ed5-8c54-816aa4f8c502".toJson,
+              "key" -> "abczO3xZCLrMN6v2BKK1dXYFpXlPkccOFqm12CdAsMgRU4VrNZ9lyGVCGuMDGIwP".toJson)),
+          "subject" -> docId2.toJson)
+
+        isDBAlive(0) shouldBe true
+
+        retry(db1.putDoc(docId, testDocument))
+
+        stopComponent(couchDB0DockerHost, "couchdb")
+
+        retry({
+          isDBAlive(0) shouldBe false
+        }, 100, Some(100.milliseconds))
+
+        retry({
+          val result = Await.result(db2.getDoc(docId), 15.seconds)
+          result should be('right)
+          result.right.get.getFields("_id") shouldBe testDocument.getFields("_id")
+        })
+
+        retry(
+          {
+            val result = Await.result(
+              db2.executeView("subjects", "identities")(startKey = List(docId), endKey = List(docId)),
+              15.seconds)
+            result should be('right)
+            result.right.get.getFields("_id") shouldBe testDocument.getFields("namespace")
+          },
+          100,
+          Some(100.milliseconds))
+
+        retry(db2.putDoc(docId2, testDocument2))
+
+        isDBAlive(0) shouldBe false
+
+        startComponent(couchDB0DockerHost, "couchdb")
+
+        retry({
+          isDBAlive(0) shouldBe true
+        }, 100, Some(100.milliseconds))
+
+        retry({
+          val result = Await.result(db1.getDoc(docId2), 15.seconds)
+          result should be('right)
+          result.right.get.getFields("_id") shouldBe testDocument2.getFields("_id")
+        })
+
+        retry(
+          {
+            val result = Await.result(
+              db1.executeView("subjects", "identities")(startKey = List(docId2), endKey = List(docId2)),
+              15.seconds)
+            result should be('right)
+            result.right.get.getFields("_id") shouldBe testDocument2.getFields("namespace")
+          },
+          100,
+          Some(100.milliseconds))
+
+        val doc1Result = Await.result(db1.getDoc(docId), 15.seconds)
+        val doc2Result = Await.result(db1.getDoc(docId2), 15.seconds)
+        val rev1 = doc1Result.right.get.fields.get("_rev").get.convertTo[String]
+        val rev2 = doc2Result.right.get.fields.get("_rev").get.convertTo[String]
+        Await.result(db1.deleteDoc(docId, rev1), 15.seconds)
+        Await.result(db1.deleteDoc(docId2, rev2), 15.seconds)
+
+        retry({
+          val result = Await.result(db1.getDoc(docId), 15.seconds)
+          result should be('left)
+        })
+        retry({
+          val result = Await.result(db1.getDoc(docId2), 15.seconds)
+          result should be('left)
+        })
+      }
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services