You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/04/17 23:23:44 UTC

[GitHub] tysonnorris closed pull request #2992: create SPI for SeedNodesProvider; MarathonSeedNodesProvider to provid…

tysonnorris closed pull request #2992: create SPI for SeedNodesProvider; MarathonSeedNodesProvider to provid…
URL: https://github.com/apache/incubator-openwhisk/pull/2992
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 4c36319523..9f7448312e 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -12,7 +12,10 @@ akka.http {
         max-open-requests = 512
     }
 }
-
+whisk.cluster.discovery {
+    #interval to wait for next health checks during mesos seed node discovery
+    marathon-check-interval-seconds = 1
+}
 #kamon related configuration
 kamon {
 
diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf
index 8048d43bd4..9821b8fb77 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -1,4 +1,4 @@
-whisk.spi {
+whisk.spi{
   ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider
   MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider
   ContainerFactoryProvider = whisk.core.containerpool.docker.DockerContainerFactoryProvider
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7e6148038d..fb7f5f6769 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -215,6 +215,7 @@ object WhiskConfig {
 object ConfigKeys {
   val loadbalancer = "whisk.loadbalancer"
 
+  val cluster = "whisk.cluster"
   val couchdb = "whisk.couchdb"
   val kafka = "whisk.kafka"
   val kafkaCommon = s"$kafka.common"
diff --git a/core/controller/build.gradle b/core/controller/build.gradle
index 0b904440af..cf1197bf1e 100644
--- a/core/controller/build.gradle
+++ b/core/controller/build.gradle
@@ -15,6 +15,8 @@ repositories {
 
 dependencies {
     compile 'com.typesafe.akka:akka-distributed-data_2.11:2.5.4'
+    compile "com.lightbend.akka.management:akka-management-cluster-bootstrap_2.11:0.10.0"
+    compile "com.lightbend.akka.discovery:akka-discovery-marathon-api_2.11:0.10.0"
     compile "org.scala-lang:scala-library:${gradle.scala.version}"
     compile project(':common:scala')
 }
diff --git a/core/controller/src/main/resources/application.conf b/core/controller/src/main/resources/application.conf
index bbc298b1e3..1daed6ee5f 100644
--- a/core/controller/src/main/resources/application.conf
+++ b/core/controller/src/main/resources/application.conf
@@ -10,6 +10,9 @@ whisk {
   controller {
     protocol: http
   }
+  cluster {
+    use-cluster-bootstrap: false #true enables use of Akka Management + Akka Cluster Bootstrap; otherwise, static list of seed nodes is required
+  }
 }
 
 # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
@@ -81,3 +84,55 @@ akka {
     #distributed-data.notify-subscribers-interval = 0.01
   }
 }
+akka {
+  discovery {
+    #method = marathon-api
+    marathon-api {
+      class = akka.discovery.marathon.MarathonApiSimpleServiceDiscovery
+
+      # URL for getting list of apps from Marathon. Verified on OSS DC/OS 1.8, 1.9
+      app-api-url = "http://192.168.99.100:8080/v2/apps"
+
+      # The name of the akka management port - this cannot have underscores or dashes (env var name)
+      app-port-name = "akkamgmthttp"
+
+      # Used to find other apps running by Marathon. This will be passed as the label query string parameter
+      # to the apps-api-url defined above.
+      # `%s` will be replaced with the configured effective name, which defaults to the actor system name
+      app-label-query = "ACTOR_SYSTEM_NAME==%s"
+    }
+  }
+
+  management {
+    http {
+      # The hostname where the HTTP Server for Http Cluster Management will be started.
+      # This defines the interface to use.
+      # InetAddress.getLocalHost.getHostAddress is used not overriden or empty
+
+      hostname = ${?HOST}
+
+      port = 19999
+      port = ${?PORT_19999}
+
+      bind-hostname = 0.0.0.0
+      bind-port = 19999
+    }
+
+    cluster.bootstrap {
+
+      # Configuration for the first phase of bootstraping, during which contact points are discovered
+      # using the configured service discovery mechanism.
+      contact-point-discovery {
+
+        # Marathon API discovery uses effective-name when it's defined.
+        #
+        # Marathon API dicsovery uses this value to substitute to the query `app-label-query`
+        #
+        # It should match with application LABEL value declared in Marathon description.
+        #
+        effective-name= "whisk-controller-docker-app-label"
+      }
+
+    }
+  }
+}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ClusterProvider.scala
similarity index 62%
rename from core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala
rename to core/controller/src/main/scala/whisk/core/loadBalancer/ClusterProvider.scala
index be630a4234..eaae0858e6 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/SeedNodesProvider.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ClusterProvider.scala
@@ -17,15 +17,32 @@
 
 package whisk.core.loadBalancer
 
+import akka.actor.ActorSystem
 import akka.actor.Address
-
+import akka.cluster.Cluster
+import akka.management.AkkaManagement
+import akka.management.cluster.bootstrap.ClusterBootstrap
+import pureconfig._
 import scala.collection.immutable.Seq
+import whisk.core.ConfigKeys
+import whisk.core.WhiskConfig
+
+case class ClusterProviderConfig(useClusterBootstrap: Boolean)
 
-trait SeedNodesProvider {
-  def getSeedNodes(): Seq[Address]
+object ClusterProvider {
+  def joinCluster(config: WhiskConfig, actorSystem: ActorSystem) = {
+    val clusterConfig: ClusterProviderConfig = loadConfigOrThrow[ClusterProviderConfig](ConfigKeys.cluster)
+    if (clusterConfig.useClusterBootstrap) {
+      AkkaManagement(actorSystem).start()
+      ClusterBootstrap(actorSystem).start()
+    } else {
+      Cluster(actorSystem).joinSeedNodes(
+        new StaticSeedNodesProvider(config.controllerSeedNodes, actorSystem.name).getSeedNodes())
+    }
+  }
 }
 
-class StaticSeedNodesProvider(seedNodes: String, actorSystemName: String) extends SeedNodesProvider {
+class StaticSeedNodesProvider(seedNodes: String, actorSystemName: String) {
   def getSeedNodes(): Seq[Address] = {
     seedNodes
       .split(' ')
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index 570281d658..3efc6418a5 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -21,12 +21,10 @@ import java.nio.charset.StandardCharsets
 import java.util.concurrent.ThreadLocalRandom
 
 import akka.actor.{ActorSystem, Props}
-import akka.cluster.Cluster
 import akka.pattern.ask
 import akka.stream.ActorMaterializer
 import akka.util.Timeout
 import org.apache.kafka.clients.producer.RecordMetadata
-import pureconfig._
 import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
 import whisk.core.WhiskConfig._
 import whisk.core.connector._
@@ -65,9 +63,8 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)
       new LocalLoadBalancerData()
     } else {
 
-      /** Specify how seed nodes are generated */
-      val seedNodesProvider = new StaticSeedNodesProvider(config.controllerSeedNodes, actorSystem.name)
-      Cluster(actorSystem).joinSeedNodes(seedNodesProvider.getSeedNodes())
+      /** Specify how cluster is joined */
+      ClusterProvider.joinCluster(config, actorSystem)
       new DistributedLoadBalancerData()
     }
   }
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 1f5f398a87..d69c08999b 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -57,9 +57,8 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
   private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
 
   /** Build a cluster of all loadbalancers */
-  val seedNodesProvider = new StaticSeedNodesProvider(config.controllerSeedNodes, actorSystem.name)
+  ClusterProvider.joinCluster(config, actorSystem)
   val cluster = Cluster(actorSystem)
-  cluster.joinSeedNodes(seedNodesProvider.getSeedNodes())
 
   /** Used to manage an action for testing invoker health */
   private val entityStore = WhiskEntityStore.datastore()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services