You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2018/04/17 22:23:14 UTC

[incubator-openwhisk] branch master updated: Implement possibility to discover cluster nodes via Akka Management. (#3548)

This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 166189a  Implement possibility to discover cluster nodes via Akka Management. (#3548)
166189a is described below

commit 166189a8f15c99d9237e7020865a34c5bc92a0c2
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Wed Apr 18 00:23:10 2018 +0200

    Implement possibility to discover cluster nodes via Akka Management. (#3548)
    
    * Implement possibility to discover cluster nodes via Akka Management.
    
    Akka Management makes it possible to discover cluster nodes via a cluster API (like Kubernetes or Mesos). This also cleans up the way we provide seed-nodes if static configuration is required.
    
    Co-authored-by: Tyson Norris <tn...@adobe.com>
    
    * Remove unneeded SeedNodesProvider.
    
    * Documentation.
---
 ansible/roles/controller/tasks/deploy.yml          | 39 ++++++++------
 .../src/main/scala/whisk/core/WhiskConfig.scala    |  1 +
 core/controller/build.gradle                       |  3 ++
 .../controller/src/main/resources/application.conf |  2 -
 core/controller/src/main/resources/reference.conf  |  3 ++
 .../core/loadBalancer/SeedNodesProvider.scala      | 41 ---------------
 .../ShardingContainerPoolBalancer.scala            | 24 ++++++---
 docs/deploy.md                                     |  7 +++
 .../loadBalancer/test/SeedNodesProviderTests.scala | 61 ----------------------
 9 files changed, 54 insertions(+), 127 deletions(-)

diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 87167f5..d543a40 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -93,21 +93,9 @@
   set_fact:
     controller_args: "{{ controller.arguments }} {{ jmx.jvmCommonArgs }} -Djava.rmi.server.hostname={{ inventory_hostname }} -Dcom.sun.management.jmxremote.rmi.port={{ jmx.rmiBasePortController + (controller_index | int) }} -Dcom.sun.management.jmxremote.port={{ jmx.basePortController + (controller_index | int) }}"
 
-- name: create seed nodes list
+- name: populate environment variables for controller
   set_fact:
-    seed_nodes_list: "{{ seed_nodes_list | default([]) }} + [ \"{{item.1}}:{{controller.akka.cluster.basePort+item.0}}\" ]"
-  with_indexed_items:
-  - "{{ controller.akka.cluster.seedNodes }}"
-
-- name: (re)start controller
-  docker_container:
-    name: "{{ controller_name }}"
-    image: "{{ docker_registry }}{{ docker.image.prefix }}/controller:{{ docker.image.tag }}"
-    state: started
-    recreate: true
-    restart_policy: "{{ docker.restart.policy }}"
-    hostname: "{{ controller_name }}"
-    env:
+    controller_env:
       "JAVA_OPTS": "-Xmx{{ controller.heap }} -XX:+CrashOnOutOfMemoryError -XX:+UseGCOverheadLimit -XX:ErrorFile=/logs/java_error.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/logs"
       "CONTROLLER_OPTS": "{{ controller_args | default(controller.arguments) }}"
       "CONTROLLER_INSTANCES": "{{ controller.instances }}"
@@ -171,7 +159,6 @@
       "CONFIG_whisk_runtimes_bypassPullForLocalImages": "{{ runtimes_bypass_pull_for_local_images | default() }}"
       "CONFIG_whisk_runtimes_localImagePrefix": "{{ runtimes_local_image_prefix | default() }}"
 
-      "AKKA_CLUSTER_SEED_NODES": "{{seed_nodes_list | join(' ') }}"
       "METRICS_KAMON": "{{ metrics.kamon.enabled }}"
       "METRICS_KAMON_TAGS": "{{ metrics.kamon.tags }}"
       "METRICS_LOG": "{{ metrics.log.enabled }}"
@@ -199,6 +186,28 @@
       "CONFIG_logback_log_level": "{{ controller.loglevel }}"
 
       "CONFIG_whisk_transactions_stride": "{{ transactions.stride | default() }}"
+
+- name: create seed nodes list
+  set_fact:
+    seed_nodes_list: "{{ seed_nodes_list | default([]) }} + [ '{{ item.1 }}:{{ controller.akka.cluster.basePort+item.0 }}' ]"
+  with_indexed_items:
+  - "{{ controller.akka.cluster.seedNodes }}"
+
+- name: add seed nodes to controller environment
+  set_fact:
+    controller_env: "{{ controller_env | default({}) | combine({'CONFIG_akka_cluster_seedNodes_' ~ item.0: 'akka.tcp://controller-actor-system@' ~ item.1}) }}"
+  with_indexed_items: "{{ seed_nodes_list }}"
+
+
+- name: (re)start controller
+  docker_container:
+    name: "{{ controller_name }}"
+    image: "{{ docker_registry }}{{ docker.image.prefix }}/controller:{{ docker.image.tag }}"
+    state: started
+    recreate: true
+    restart_policy: "{{ docker.restart.policy }}"
+    hostname: "{{ controller_name }}"
+    env: "{{ controller_env }}"
     volumes:
       - "{{ whisk_logs_dir }}/{{ controller_name }}:/logs"
       - "{{ controller.confdir }}/{{ controller_name }}:/conf"
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index c9683d6..36f74e0 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -205,6 +205,7 @@ object WhiskConfig {
 }
 
 object ConfigKeys {
+  val cluster = "whisk.cluster"
   val loadbalancer = "whisk.loadbalancer"
 
   val couchdb = "whisk.couchdb"
diff --git a/core/controller/build.gradle b/core/controller/build.gradle
index 2a184d1..8a50af3 100644
--- a/core/controller/build.gradle
+++ b/core/controller/build.gradle
@@ -15,6 +15,9 @@ repositories {
 
 dependencies {
     compile "org.scala-lang:scala-library:${gradle.scala.version}"
+    compile 'com.lightbend.akka.management:akka-management-cluster-bootstrap_2.11:0.11.0'
+    compile 'com.lightbend.akka.discovery:akka-discovery-kubernetes-api_2.11:0.11.0'
+    compile 'com.lightbend.akka.discovery:akka-discovery-marathon-api_2.11:0.11.0'
     compile project(':common:scala')
 }
 
diff --git a/core/controller/src/main/resources/application.conf b/core/controller/src/main/resources/application.conf
index 446d5fe..e72bbc3 100644
--- a/core/controller/src/main/resources/application.conf
+++ b/core/controller/src/main/resources/application.conf
@@ -67,7 +67,5 @@ akka {
   cluster {
     # Disable legacy metrics in akka-cluster.
     metrics.enabled=off
-
-    #distributed-data.notify-subscribers-interval = 0.01
   }
 }
diff --git a/core/controller/src/main/resources/reference.conf b/core/controller/src/main/resources/reference.conf
index ce13c1e..ec7cd8a 100644
--- a/core/controller/src/main/resources/reference.conf
+++ b/core/controller/src/main/resources/reference.conf
@@ -1,5 +1,8 @@
 
 whisk {
+  cluster {
+    use-cluster-bootstrap: false
+  }
   loadbalancer {
     invoker-busy-threshold: 4
     blackbox-fraction: 10%
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala
deleted file mode 100644
index be630a4..0000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import akka.actor.Address
-
-import scala.collection.immutable.Seq
-
-trait SeedNodesProvider {
-  def getSeedNodes(): Seq[Address]
-}
-
-class StaticSeedNodesProvider(seedNodes: String, actorSystemName: String) extends SeedNodesProvider {
-  def getSeedNodes(): Seq[Address] = {
-    seedNodes
-      .split(' ')
-      .flatMap { rawNodes =>
-        val ipWithPort = rawNodes.split(":")
-        ipWithPort match {
-          case Array(host, port) => Seq(Address("akka.tcp", actorSystemName, host, port.toInt))
-          case _                 => Seq.empty[Address]
-        }
-      }
-      .toIndexedSeq
-  }
-}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index d10f03e..e99b972 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -25,6 +25,8 @@ import akka.actor.{Actor, ActorSystem, Cancellable, Props}
 import akka.cluster.ClusterEvent._
 import akka.cluster.{Cluster, Member, MemberStatus}
 import akka.event.Logging.InfoLevel
+import akka.management.AkkaManagement
+import akka.management.cluster.bootstrap.ClusterBootstrap
 import akka.stream.ActorMaterializer
 import org.apache.kafka.clients.producer.RecordMetadata
 import pureconfig._
@@ -57,13 +59,12 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
   private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
 
   /** Build a cluster of all loadbalancers */
-  private val seedNodesProvider = new StaticSeedNodesProvider(config.controllerSeedNodes, actorSystem.name)
-  private val seedNodes = seedNodesProvider.getSeedNodes()
-
-  private val cluster: Option[Cluster] = if (seedNodes.nonEmpty) {
-    val cluster = Cluster(actorSystem)
-    cluster.joinSeedNodes(seedNodes)
-    Some(cluster)
+  private val cluster: Option[Cluster] = if (loadConfigOrThrow[ClusterConfig](ConfigKeys.cluster).useClusterBootstrap) {
+    AkkaManagement(actorSystem).start()
+    ClusterBootstrap(actorSystem).start()
+    Some(Cluster(actorSystem))
+  } else if (loadConfigOrThrow[Seq[String]]("akka.cluster.seed-nodes").nonEmpty) {
+    Some(Cluster(actorSystem))
   } else {
     None
   }
@@ -305,7 +306,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
     logging: Logging,
     materializer: ActorMaterializer): LoadBalancer = new ShardingContainerPoolBalancer(whiskConfig, instance)
 
-  def requiredProperties: Map[String, String] = kafkaHosts ++ Map(controllerSeedNodes -> null)
+  def requiredProperties: Map[String, String] = kafkaHosts
 
   /** Generates a hash based on the string representation of namespace and action */
   def generateHash(namespace: EntityName, action: FullyQualifiedEntityName): Int = {
@@ -472,6 +473,13 @@ case class ShardingContainerPoolBalancerState(
 }
 
 /**
+ * Configuration for the cluster created between loadbalancers.
+ *
+ * @param useClusterBootstrap Whether or not to use a bootstrap mechanism
+ */
+case class ClusterConfig(useClusterBootstrap: Boolean)
+
+/**
  * Configuration for the sharding container pool balancer.
  *
  * @param blackboxFraction the fraction of all invokers to use exclusively for blackboxes
diff --git a/docs/deploy.md b/docs/deploy.md
index 1ad8456..a1abad0 100644
--- a/docs/deploy.md
+++ b/docs/deploy.md
@@ -4,6 +4,13 @@ This page documents configuration options that should be considered when deployi
 
 The system can be configured to use Akka clustering to manage the distributed state of the Contoller's load balancing algorithm.  This imposes the following constraints on a deployment
 
+## Cluster setup
+
+To setup a cluster, the controllers need to be able to discover each other. There are 2 basic ways to achieve this:
+
+1. Provide the so called **seed-nodes** explicitly on deployment. Essentially you have a static list of possible seed nodes which are used to build a cluster. In an ansible based deployment, they are determined for you from the `hosts` file. On any other deployment model, the `CONFIG_akka_cluster_seedNodes.$i` variables will need to be provided according to the [akka cluster documentation](https://doc.akka.io/docs/akka/2.5/cluster-usage.html#joining-to-seed-nodes).
+2. Discover the nodes from an external service. This is built upon [akka-management](https://developer.lightbend.com/docs/akka-management/current/) and by default [Kubernetes](https://developer.lightbend.com/docs/akka-management/current/discovery.html#discovery-method-kubernetes-api) and [Mesos (Marathon)](https://developer.lightbend.com/docs/akka-management/current/discovery.html#discovery-method-marathon-api) are supported. You can refer to the respective documentation above to configu [...]
+
 
 ## Controller nodes must have static IPs/Port combination.
 
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/SeedNodesProviderTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/SeedNodesProviderTests.scala
deleted file mode 100644
index ef4ae85..0000000
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/SeedNodesProviderTests.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer.test
-
-import akka.actor.Address
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{FlatSpec, Matchers}
-import whisk.core.loadBalancer.StaticSeedNodesProvider
-
-@RunWith(classOf[JUnitRunner])
-class SeedNodesProviderTests extends FlatSpec with Matchers {
-
-  val actorSystemName = "controller-actor-system"
-  val host = "192.168.99.100"
-  val port = 8000
-  val oneFakeSeedNode = s"$host:$port"
-  val twoFakeSeedNodes = s"$host:$port $host:${port + 1}"
-  val twoFakeSeedNodesWithoutWhitespace = s"$host:$port$host:${port + 1}"
-
-  behavior of "StaticSeedNodesProvider"
-
-  it should "return a sequence with a single seed node" in {
-    val seedNodesProvider = new StaticSeedNodesProvider(oneFakeSeedNode, actorSystemName)
-    val seqWithOneSeedNode = seedNodesProvider.getSeedNodes()
-    seqWithOneSeedNode shouldBe IndexedSeq(Address("akka.tcp", actorSystemName, host, port))
-  }
-  it should "return a sequence with more then one seed node" in {
-    val seedNodesProvider = new StaticSeedNodesProvider(twoFakeSeedNodes, actorSystemName)
-    val seqWithTwoSeedNodes = seedNodesProvider.getSeedNodes()
-    seqWithTwoSeedNodes shouldBe IndexedSeq(
-      Address("akka.tcp", actorSystemName, host, port),
-      Address("akka.tcp", actorSystemName, host, port + 1))
-  }
-  it should "return an empty sequence if seed nodes are provided in the wrong format" in {
-    val seedNodesProvider = new StaticSeedNodesProvider(twoFakeSeedNodesWithoutWhitespace, actorSystemName)
-    val noneResponse = seedNodesProvider.getSeedNodes()
-    noneResponse shouldBe IndexedSeq.empty[Address]
-  }
-  it should "return an empty sequence if no seed nodes specified" in {
-    val seedNodesProvider = new StaticSeedNodesProvider("", actorSystemName)
-    val noneResponse = seedNodesProvider.getSeedNodes()
-    noneResponse shouldBe IndexedSeq.empty[Address]
-  }
-
-}

-- 
To stop receiving notification emails like this one, please contact
tysonnorris@apache.org.