You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/04/17 22:23:12 UTC

[GitHub] tysonnorris closed pull request #3548: Implement possibility to discover cluster nodes via Akka Management.

tysonnorris closed pull request #3548: Implement possibility to discover cluster nodes via Akka Management.
URL: https://github.com/apache/incubator-openwhisk/pull/3548
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 87167f5041..d543a40442 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -93,21 +93,9 @@
   set_fact:
     controller_args: "{{ controller.arguments }} {{ jmx.jvmCommonArgs }} -Djava.rmi.server.hostname={{ inventory_hostname }} -Dcom.sun.management.jmxremote.rmi.port={{ jmx.rmiBasePortController + (controller_index | int) }} -Dcom.sun.management.jmxremote.port={{ jmx.basePortController + (controller_index | int) }}"
 
-- name: create seed nodes list
+- name: populate environment variables for controller
   set_fact:
-    seed_nodes_list: "{{ seed_nodes_list | default([]) }} + [ \"{{item.1}}:{{controller.akka.cluster.basePort+item.0}}\" ]"
-  with_indexed_items:
-  - "{{ controller.akka.cluster.seedNodes }}"
-
-- name: (re)start controller
-  docker_container:
-    name: "{{ controller_name }}"
-    image: "{{ docker_registry }}{{ docker.image.prefix }}/controller:{{ docker.image.tag }}"
-    state: started
-    recreate: true
-    restart_policy: "{{ docker.restart.policy }}"
-    hostname: "{{ controller_name }}"
-    env:
+    controller_env:
       "JAVA_OPTS": "-Xmx{{ controller.heap }} -XX:+CrashOnOutOfMemoryError -XX:+UseGCOverheadLimit -XX:ErrorFile=/logs/java_error.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/logs"
       "CONTROLLER_OPTS": "{{ controller_args | default(controller.arguments) }}"
       "CONTROLLER_INSTANCES": "{{ controller.instances }}"
@@ -171,7 +159,6 @@
       "CONFIG_whisk_runtimes_bypassPullForLocalImages": "{{ runtimes_bypass_pull_for_local_images | default() }}"
       "CONFIG_whisk_runtimes_localImagePrefix": "{{ runtimes_local_image_prefix | default() }}"
 
-      "AKKA_CLUSTER_SEED_NODES": "{{seed_nodes_list | join(' ') }}"
       "METRICS_KAMON": "{{ metrics.kamon.enabled }}"
       "METRICS_KAMON_TAGS": "{{ metrics.kamon.tags }}"
       "METRICS_LOG": "{{ metrics.log.enabled }}"
@@ -199,6 +186,28 @@
       "CONFIG_logback_log_level": "{{ controller.loglevel }}"
 
       "CONFIG_whisk_transactions_stride": "{{ transactions.stride | default() }}"
+
+- name: create seed nodes list
+  set_fact:
+    seed_nodes_list: "{{ seed_nodes_list | default([]) }} + [ '{{ item.1 }}:{{ controller.akka.cluster.basePort+item.0 }}' ]"
+  with_indexed_items:
+  - "{{ controller.akka.cluster.seedNodes }}"
+
+- name: add seed nodes to controller environment
+  set_fact:
+    controller_env: "{{ controller_env | default({}) | combine({'CONFIG_akka_cluster_seedNodes_' ~ item.0: 'akka.tcp://controller-actor-system@' ~ item.1}) }}"
+  with_indexed_items: "{{ seed_nodes_list }}"
+
+
+- name: (re)start controller
+  docker_container:
+    name: "{{ controller_name }}"
+    image: "{{ docker_registry }}{{ docker.image.prefix }}/controller:{{ docker.image.tag }}"
+    state: started
+    recreate: true
+    restart_policy: "{{ docker.restart.policy }}"
+    hostname: "{{ controller_name }}"
+    env: "{{ controller_env }}"
     volumes:
       - "{{ whisk_logs_dir }}/{{ controller_name }}:/logs"
       - "{{ controller.confdir }}/{{ controller_name }}:/conf"
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index c9683d62db..36f74e02fd 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -205,6 +205,7 @@ object WhiskConfig {
 }
 
 object ConfigKeys {
+  val cluster = "whisk.cluster"
   val loadbalancer = "whisk.loadbalancer"
 
   val couchdb = "whisk.couchdb"
diff --git a/core/controller/build.gradle b/core/controller/build.gradle
index 2a184d15d7..8a50af38b9 100644
--- a/core/controller/build.gradle
+++ b/core/controller/build.gradle
@@ -15,6 +15,9 @@ repositories {
 
 dependencies {
     compile "org.scala-lang:scala-library:${gradle.scala.version}"
+    compile 'com.lightbend.akka.management:akka-management-cluster-bootstrap_2.11:0.11.0'
+    compile 'com.lightbend.akka.discovery:akka-discovery-kubernetes-api_2.11:0.11.0'
+    compile 'com.lightbend.akka.discovery:akka-discovery-marathon-api_2.11:0.11.0'
     compile project(':common:scala')
 }
 
diff --git a/core/controller/src/main/resources/application.conf b/core/controller/src/main/resources/application.conf
index 446d5fed34..e72bbc3b38 100644
--- a/core/controller/src/main/resources/application.conf
+++ b/core/controller/src/main/resources/application.conf
@@ -67,7 +67,5 @@ akka {
   cluster {
     # Disable legacy metrics in akka-cluster.
     metrics.enabled=off
-
-    #distributed-data.notify-subscribers-interval = 0.01
   }
 }
diff --git a/core/controller/src/main/resources/reference.conf b/core/controller/src/main/resources/reference.conf
index ce13c1e124..ec7cd8a4cd 100644
--- a/core/controller/src/main/resources/reference.conf
+++ b/core/controller/src/main/resources/reference.conf
@@ -1,5 +1,8 @@
 
 whisk {
+  cluster {
+    use-cluster-bootstrap: false
+  }
   loadbalancer {
     invoker-busy-threshold: 4
     blackbox-fraction: 10%
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala
deleted file mode 100644
index be630a4234..0000000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import akka.actor.Address
-
-import scala.collection.immutable.Seq
-
-trait SeedNodesProvider {
-  def getSeedNodes(): Seq[Address]
-}
-
-class StaticSeedNodesProvider(seedNodes: String, actorSystemName: String) extends SeedNodesProvider {
-  def getSeedNodes(): Seq[Address] = {
-    seedNodes
-      .split(' ')
-      .flatMap { rawNodes =>
-        val ipWithPort = rawNodes.split(":")
-        ipWithPort match {
-          case Array(host, port) => Seq(Address("akka.tcp", actorSystemName, host, port.toInt))
-          case _                 => Seq.empty[Address]
-        }
-      }
-      .toIndexedSeq
-  }
-}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index d10f03e08e..e99b972678 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -25,6 +25,8 @@ import akka.actor.{Actor, ActorSystem, Cancellable, Props}
 import akka.cluster.ClusterEvent._
 import akka.cluster.{Cluster, Member, MemberStatus}
 import akka.event.Logging.InfoLevel
+import akka.management.AkkaManagement
+import akka.management.cluster.bootstrap.ClusterBootstrap
 import akka.stream.ActorMaterializer
 import org.apache.kafka.clients.producer.RecordMetadata
 import pureconfig._
@@ -57,13 +59,12 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
   private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
 
   /** Build a cluster of all loadbalancers */
-  private val seedNodesProvider = new StaticSeedNodesProvider(config.controllerSeedNodes, actorSystem.name)
-  private val seedNodes = seedNodesProvider.getSeedNodes()
-
-  private val cluster: Option[Cluster] = if (seedNodes.nonEmpty) {
-    val cluster = Cluster(actorSystem)
-    cluster.joinSeedNodes(seedNodes)
-    Some(cluster)
+  private val cluster: Option[Cluster] = if (loadConfigOrThrow[ClusterConfig](ConfigKeys.cluster).useClusterBootstrap) {
+    AkkaManagement(actorSystem).start()
+    ClusterBootstrap(actorSystem).start()
+    Some(Cluster(actorSystem))
+  } else if (loadConfigOrThrow[Seq[String]]("akka.cluster.seed-nodes").nonEmpty) {
+    Some(Cluster(actorSystem))
   } else {
     None
   }
@@ -305,7 +306,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
     logging: Logging,
     materializer: ActorMaterializer): LoadBalancer = new ShardingContainerPoolBalancer(whiskConfig, instance)
 
-  def requiredProperties: Map[String, String] = kafkaHosts ++ Map(controllerSeedNodes -> null)
+  def requiredProperties: Map[String, String] = kafkaHosts
 
   /** Generates a hash based on the string representation of namespace and action */
   def generateHash(namespace: EntityName, action: FullyQualifiedEntityName): Int = {
@@ -471,6 +472,13 @@ case class ShardingContainerPoolBalancerState(
   }
 }
 
+/**
+ * Configuration for the cluster created between loadbalancers.
+ *
+ * @param useClusterBootstrap Whether or not to use a bootstrap mechanism
+ */
+case class ClusterConfig(useClusterBootstrap: Boolean)
+
 /**
  * Configuration for the sharding container pool balancer.
  *
diff --git a/docs/deploy.md b/docs/deploy.md
index 1ad8456af5..a1abad0b5b 100644
--- a/docs/deploy.md
+++ b/docs/deploy.md
@@ -4,6 +4,13 @@ This page documents configuration options that should be considered when deployi
 
 The system can be configured to use Akka clustering to manage the distributed state of the Contoller's load balancing algorithm.  This imposes the following constraints on a deployment
 
+## Cluster setup
+
+To setup a cluster, the controllers need to be able to discover each other. There are 2 basic ways to achieve this:
+
+1. Provide the so called **seed-nodes** explicitly on deployment. Essentially you have a static list of possible seed nodes which are used to build a cluster. In an ansible based deployment, they are determined for you from the `hosts` file. On any other deployment model, the `CONFIG_akka_cluster_seedNodes.$i` variables will need to be provided according to the [akka cluster documentation](https://doc.akka.io/docs/akka/2.5/cluster-usage.html#joining-to-seed-nodes).
+2. Discover the nodes from an external service. This is built upon [akka-management](https://developer.lightbend.com/docs/akka-management/current/) and by default [Kubernetes](https://developer.lightbend.com/docs/akka-management/current/discovery.html#discovery-method-kubernetes-api) and [Mesos (Marathon)](https://developer.lightbend.com/docs/akka-management/current/discovery.html#discovery-method-marathon-api) are supported. You can refer to the respective documentation above to configure discovery accordingly.
+
 
 ## Controller nodes must have static IPs/Port combination.
 
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/SeedNodesProviderTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/SeedNodesProviderTests.scala
deleted file mode 100644
index ef4ae850c0..0000000000
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/SeedNodesProviderTests.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer.test
-
-import akka.actor.Address
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{FlatSpec, Matchers}
-import whisk.core.loadBalancer.StaticSeedNodesProvider
-
-@RunWith(classOf[JUnitRunner])
-class SeedNodesProviderTests extends FlatSpec with Matchers {
-
-  val actorSystemName = "controller-actor-system"
-  val host = "192.168.99.100"
-  val port = 8000
-  val oneFakeSeedNode = s"$host:$port"
-  val twoFakeSeedNodes = s"$host:$port $host:${port + 1}"
-  val twoFakeSeedNodesWithoutWhitespace = s"$host:$port$host:${port + 1}"
-
-  behavior of "StaticSeedNodesProvider"
-
-  it should "return a sequence with a single seed node" in {
-    val seedNodesProvider = new StaticSeedNodesProvider(oneFakeSeedNode, actorSystemName)
-    val seqWithOneSeedNode = seedNodesProvider.getSeedNodes()
-    seqWithOneSeedNode shouldBe IndexedSeq(Address("akka.tcp", actorSystemName, host, port))
-  }
-  it should "return a sequence with more then one seed node" in {
-    val seedNodesProvider = new StaticSeedNodesProvider(twoFakeSeedNodes, actorSystemName)
-    val seqWithTwoSeedNodes = seedNodesProvider.getSeedNodes()
-    seqWithTwoSeedNodes shouldBe IndexedSeq(
-      Address("akka.tcp", actorSystemName, host, port),
-      Address("akka.tcp", actorSystemName, host, port + 1))
-  }
-  it should "return an empty sequence if seed nodes are provided in the wrong format" in {
-    val seedNodesProvider = new StaticSeedNodesProvider(twoFakeSeedNodesWithoutWhitespace, actorSystemName)
-    val noneResponse = seedNodesProvider.getSeedNodes()
-    noneResponse shouldBe IndexedSeq.empty[Address]
-  }
-  it should "return an empty sequence if no seed nodes specified" in {
-    val seedNodesProvider = new StaticSeedNodesProvider("", actorSystemName)
-    val noneResponse = seedNodesProvider.getSeedNodes()
-    noneResponse shouldBe IndexedSeq.empty[Address]
-  }
-
-}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services